This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 4722514e Test distributed metadata (#329)
4722514e is described below
commit 4722514eeb64bad04f0e36cc260740c8968edf39
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Sep 14 22:20:52 2023 +0800
Test distributed metadata (#329)
---
banyand/cmd/server/main.go | 5 +-
banyand/dquery/dquery.go | 15 ++++
banyand/dquery/measure.go | 2 +-
banyand/dquery/stream.go | 2 +-
banyand/measure/metadata.go | 7 +-
banyand/metadata/client.go | 7 +-
banyand/metadata/schema/etcd.go | 60 ++++++++++-----
banyand/metadata/schema/etcd_test.go | 2 +-
banyand/metadata/schema/group.go | 33 +++-----
banyand/metadata/schema/node.go | 3 +-
banyand/metadata/schema/property.go | 5 +-
banyand/metadata/schema/register_test.go | 9 ++-
banyand/metadata/schema/schema_suite_test.go | 37 ---------
banyand/metadata/schema/watcher.go | 14 +++-
banyand/metadata/schema/watcher_test.go | 30 ++++++++
banyand/observability/pprof.go | 10 ++-
banyand/observability/service.go | 6 ++
banyand/queue/local.go | 4 +
banyand/queue/pub/client.go | 13 +++-
banyand/queue/queue.go | 1 +
banyand/queue/sub/server.go | 4 +-
banyand/stream/metadata.go | 8 +-
.../cmd/storage.go => pkg/cmdsetup/data.go | 60 +++++----------
{banyand/internal/cmd => pkg/cmdsetup}/liaison.go | 18 ++---
{banyand/internal/cmd => pkg/cmdsetup}/root.go | 13 ++--
.../internal/cmd => pkg/cmdsetup}/standalone.go | 15 ++--
pkg/logger/logger.go | 2 +-
pkg/logger/setting.go | 8 +-
pkg/test/helpers/etcd.go | 66 ++++++++++++++++
pkg/test/setup/setup.go | 21 +++++
pkg/test/stream/etcd.go | 3 +-
scripts/build/lint.mk | 2 +-
test/integration/distributed/setup/node_test.go | 89 ++++++++++++++++++++++
.../distributed/setup/setup_suite_test.go | 60 ++++++++-------
.../cold_query/query_suite_test.go | 3 +-
.../{ => standalone}/other/measure_test.go | 0
.../{ => standalone}/other/other_suite_test.go | 3 +-
.../{ => standalone}/other/property_test.go | 0
.../other/testdata/server_cert.pem | 0
.../{ => standalone}/other/testdata/server_key.pem | 0
.../integration/{ => standalone}/other/tls_test.go | 0
.../{ => standalone}/query/query_suite_test.go | 3 +-
.../integration/standalone/standalone.go | 19 +----
43 files changed, 449 insertions(+), 213 deletions(-)
diff --git a/banyand/cmd/server/main.go b/banyand/cmd/server/main.go
index 8906f84d..99827d13 100644
--- a/banyand/cmd/server/main.go
+++ b/banyand/cmd/server/main.go
@@ -22,11 +22,12 @@ import (
"fmt"
"os"
- "github.com/apache/skywalking-banyandb/banyand/internal/cmd"
+ "github.com/apache/skywalking-banyandb/pkg/cmdsetup"
+ "github.com/apache/skywalking-banyandb/pkg/signal"
)
func main() {
- if err := cmd.NewRoot().Execute(); err != nil {
+ if err := cmdsetup.NewRoot(new(signal.Handler)).Execute(); err != nil {
_, _ = fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
diff --git a/banyand/dquery/dquery.go b/banyand/dquery/dquery.go
index 61f490c8..4d77b6fb 100644
--- a/banyand/dquery/dquery.go
+++ b/banyand/dquery/dquery.go
@@ -35,12 +35,15 @@ const (
moduleName = "distributed-query"
)
+var _ run.Service = (*queryService)(nil)
+
type queryService struct {
log *logger.Logger
metaService metadata.Repo
sqp *streamQueryProcessor
mqp *measureQueryProcessor
tqp *topNQueryProcessor
+ closer *run.Closer
}
// NewService return a new query service.
@@ -48,6 +51,7 @@ func NewService(metaService metadata.Repo, broadcaster
bus.Broadcaster,
) (run.Unit, error) {
svc := &queryService{
metaService: metaService,
+ closer: run.NewCloser(1),
}
svc.sqp = &streamQueryProcessor{
queryService: svc,
@@ -75,6 +79,17 @@ func (q *queryService) PreRun(_ context.Context) error {
return nil
}
+func (q *queryService) GracefulStop() {
+ q.sqp.streamService.Close()
+ q.mqp.measureService.Close()
+ q.closer.Done()
+ q.closer.CloseThenWait()
+}
+
+func (q *queryService) Serve() run.StopNotify {
+ return q.closer.CloseNotify()
+}
+
var _ executor.DistributedExecutionContext = (*distributedContext)(nil)
type distributedContext struct {
diff --git a/banyand/dquery/measure.go b/banyand/dquery/measure.go
index 522a9cda..b145de94 100644
--- a/banyand/dquery/measure.go
+++ b/banyand/dquery/measure.go
@@ -31,7 +31,7 @@ import (
)
type measureQueryProcessor struct {
- measureService measure.Query
+ measureService measure.SchemaService
broadcaster bus.Broadcaster
*queryService
}
diff --git a/banyand/dquery/stream.go b/banyand/dquery/stream.go
index 3a92aaeb..62804d00 100644
--- a/banyand/dquery/stream.go
+++ b/banyand/dquery/stream.go
@@ -31,7 +31,7 @@ import (
)
type streamQueryProcessor struct {
- streamService stream.Query
+ streamService stream.SchemaService
broadcaster bus.Broadcaster
*queryService
}
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index 1185442b..26a6b648 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -42,6 +42,11 @@ import (
resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
)
+// SchemaService allows querying schema information.
+type SchemaService interface {
+ Query
+ Close()
+}
type schemaRepo struct {
resourceSchema.Repository
l *logger.Logger
@@ -65,7 +70,7 @@ func newSchemaRepo(path string, metadata metadata.Repo,
}
// NewPortableRepository creates a new portable repository.
-func NewPortableRepository(metadata metadata.Repo, l *logger.Logger) Query {
+func NewPortableRepository(metadata metadata.Repo, l *logger.Logger)
SchemaService {
r := &schemaRepo{
l: l,
metadata: metadata,
diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go
index 33c258be..d1271f20 100644
--- a/banyand/metadata/client.go
+++ b/banyand/metadata/client.go
@@ -41,6 +41,7 @@ func NewClient(_ context.Context) (Service, error) {
}
type clientService struct {
+ namespace string
schemaRegistry schema.Registry
alc *allocator
closer *run.Closer
@@ -53,6 +54,7 @@ func (s *clientService) SchemaRegistry() schema.Registry {
func (s *clientService) FlagSet() *run.FlagSet {
fs := run.NewFlagSet("metadata")
+ fs.StringVar(&s.namespace, "namespace", "banyandb", "The namespace of
the metadata stored in etcd")
fs.StringArrayVar(&s.endpoints, flagEtcdEndpointsName,
[]string{"http://localhost:2379"}, "A comma-delimited list of etcd endpoints")
return fs
}
@@ -66,7 +68,10 @@ func (s *clientService) Validate() error {
func (s *clientService) PreRun(ctx context.Context) error {
var err error
- s.schemaRegistry, err =
schema.NewEtcdSchemaRegistry(schema.ConfigureServerEndpoints(s.endpoints))
+ s.schemaRegistry, err = schema.NewEtcdSchemaRegistry(
+ schema.Namespace(s.namespace),
+ schema.ConfigureServerEndpoints(s.endpoints),
+ )
if err != nil {
return err
}
diff --git a/banyand/metadata/schema/etcd.go b/banyand/metadata/schema/etcd.go
index aff8e9ad..f53a718c 100644
--- a/banyand/metadata/schema/etcd.go
+++ b/banyand/metadata/schema/etcd.go
@@ -20,13 +20,13 @@ package schema
import (
"context"
"fmt"
+ "path"
"sync"
"time"
"github.com/pkg/errors"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
- "google.golang.org/grpc"
"google.golang.org/protobuf/proto"
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
@@ -55,6 +55,13 @@ type HasMetadata interface {
// RegistryOption is the option to create Registry.
type RegistryOption func(*etcdSchemaRegistryConfig)
+// Namespace sets the namespace of the registry.
+func Namespace(namespace string) RegistryOption {
+ return func(config *etcdSchemaRegistryConfig) {
+ config.namespace = namespace
+ }
+}
+
// ConfigureServerEndpoints sets a list of the server urls.
func ConfigureServerEndpoints(url []string) RegistryOption {
return func(config *etcdSchemaRegistryConfig) {
@@ -63,14 +70,16 @@ func ConfigureServerEndpoints(url []string) RegistryOption {
}
type etcdSchemaRegistry struct {
- client *clientv3.Client
- closer *run.Closer
- l *logger.Logger
- watchers []*watcher
- mux sync.RWMutex
+ namespace string
+ client *clientv3.Client
+ closer *run.Closer
+ l *logger.Logger
+ watchers []*watcher
+ mux sync.RWMutex
}
type etcdSchemaRegistryConfig struct {
+ namespace string
serverEndpoints []string
}
@@ -124,7 +133,6 @@ func NewEtcdSchemaRegistry(options ...RegistryOption)
(Registry, error) {
DialTimeout: 5 * time.Second,
DialKeepAliveTime: 30 * time.Second,
DialKeepAliveTimeout: 10 * time.Second,
- DialOptions: []grpc.DialOption{grpc.WithBlock()},
Logger: l,
}
client, err := clientv3.New(config)
@@ -132,18 +140,27 @@ func NewEtcdSchemaRegistry(options ...RegistryOption)
(Registry, error) {
return nil, err
}
reg := &etcdSchemaRegistry{
- client: client,
- closer: run.NewCloser(1),
- l: log,
+ namespace: registryConfig.namespace,
+ client: client,
+ closer: run.NewCloser(1),
+ l: log,
}
return reg, nil
}
+func (e *etcdSchemaRegistry) prependNamespace(key string) string {
+ if e.namespace == "" {
+ return key
+ }
+ return path.Join("/", e.namespace, key)
+}
+
func (e *etcdSchemaRegistry) get(ctx context.Context, key string, message
proto.Message) error {
if !e.closer.AddRunning() {
return ErrClosed
}
defer e.closer.Done()
+ key = e.prependNamespace(key)
resp, err := e.client.Get(ctx, key)
if err != nil {
return err
@@ -177,6 +194,7 @@ func (e *etcdSchemaRegistry) update(ctx context.Context,
metadata Metadata) erro
if err != nil {
return err
}
+ key = e.prependNamespace(key)
getResp, err := e.client.Get(ctx, key)
if err != nil {
return err
@@ -228,6 +246,7 @@ func (e *etcdSchemaRegistry) create(ctx context.Context,
metadata Metadata) erro
if err != nil {
return err
}
+ key = e.prependNamespace(key)
getResp, err := e.client.Get(ctx, key)
if err != nil {
return err
@@ -255,6 +274,7 @@ func (e *etcdSchemaRegistry) listWithPrefix(ctx
context.Context, prefix string,
return nil, ErrClosed
}
defer e.closer.Done()
+ prefix = e.prependNamespace(prefix)
resp, err := e.client.Get(ctx, prefix, clientv3.WithPrefix())
if err != nil {
return nil, err
@@ -279,6 +299,7 @@ func (e *etcdSchemaRegistry) delete(ctx context.Context,
metadata Metadata) (boo
if err != nil {
return false, err
}
+ key = e.prependNamespace(key)
resp, err := e.client.Delete(ctx, key, clientv3.WithPrevKV())
if err != nil {
return false, err
@@ -298,6 +319,7 @@ func (e *etcdSchemaRegistry) register(ctx context.Context,
metadata Metadata) er
if err != nil {
return err
}
+ key = e.prependNamespace(key)
val, err := proto.Marshal(metadata.Spec.(proto.Message))
if err != nil {
return err
@@ -349,19 +371,19 @@ func (e *etcdSchemaRegistry) register(ctx
context.Context, metadata Metadata) er
}
func (e *etcdSchemaRegistry) newWatcher(name string, kind Kind, handler
EventHandler) *watcher {
- return newWatcher(e.client, kind, handler,
e.l.Named(fmt.Sprintf("watcher-%s[%s]", name, kind.String())))
+ return newWatcher(e.client, watcherConfig{
+ key: e.prependNamespace(kind.key()),
+ kind: kind,
+ handler: handler,
+ }, e.l.Named(fmt.Sprintf("watcher-%s[%s]", name, kind.String())))
}
func listPrefixesForEntity(group, entityPrefix string) string {
- return entityPrefix + "/" + group
+ return path.Join(entityPrefix, group)
}
func formatKey(entityPrefix string, metadata *commonv1.Metadata) string {
- return listPrefixesForEntity(metadata.GetGroup(), entityPrefix) + "/" +
metadata.GetName()
-}
-
-func incrementLastByte(key string) string {
- bb := []byte(key)
- bb[len(bb)-1]++
- return string(bb)
+ return path.Join(
+ listPrefixesForEntity(metadata.GetGroup(), entityPrefix),
+ metadata.GetName())
}
diff --git a/banyand/metadata/schema/etcd_test.go
b/banyand/metadata/schema/etcd_test.go
index 2ed06549..fc98664f 100644
--- a/banyand/metadata/schema/etcd_test.go
+++ b/banyand/metadata/schema/etcd_test.go
@@ -96,7 +96,7 @@ func preloadSchema(e Registry) error {
return err
}
for _, entry := range entries {
- data, err := indexRuleStore.ReadFile(indexRuleDir + "/" +
entry.Name())
+ data, err := indexRuleStore.ReadFile(path.Join(indexRuleDir,
entry.Name()))
if err != nil {
return err
}
diff --git a/banyand/metadata/schema/group.go b/banyand/metadata/schema/group.go
index f8d1545c..5afa54d2 100644
--- a/banyand/metadata/schema/group.go
+++ b/banyand/metadata/schema/group.go
@@ -19,20 +19,16 @@ package schema
import (
"context"
- "strings"
+ "path"
"github.com/pkg/errors"
clientv3 "go.etcd.io/etcd/client/v3"
- "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
)
-var (
- groupsKeyPrefix = "/groups/"
- groupMetadataKey = "/__meta_group__"
-)
+var groupsKeyPrefix = "/groups/"
func (e *etcdSchemaRegistry) GetGroup(ctx context.Context, group string)
(*commonv1.Group, error) {
var entity commonv1.Group
@@ -44,24 +40,15 @@ func (e *etcdSchemaRegistry) GetGroup(ctx context.Context,
group string) (*commo
}
func (e *etcdSchemaRegistry) ListGroup(ctx context.Context)
([]*commonv1.Group, error) {
- messages, err := e.client.Get(ctx, groupsKeyPrefix,
clientv3.WithFromKey(), clientv3.WithRange(incrementLastByte(groupsKeyPrefix)))
+ messages, err := e.listWithPrefix(ctx, groupsKeyPrefix, KindGroup)
if err != nil {
return nil, err
}
-
- var groups []*commonv1.Group
- for _, kv := range messages.Kvs {
- // kv.Key = "/groups/" + {group} + "/__meta_info__"
- if strings.HasSuffix(string(kv.Key), groupMetadataKey) {
- message := &commonv1.Group{}
- if innerErr := proto.Unmarshal(kv.Value, message);
innerErr != nil {
- return nil, innerErr
- }
- groups = append(groups, message)
- }
+ entities := make([]*commonv1.Group, 0, len(messages))
+ for _, message := range messages {
+ entities = append(entities, message.(*commonv1.Group))
}
-
- return groups, nil
+ return entities, nil
}
func (e *etcdSchemaRegistry) DeleteGroup(ctx context.Context, group string)
(bool, error) {
@@ -72,9 +59,9 @@ func (e *etcdSchemaRegistry) DeleteGroup(ctx context.Context,
group string) (boo
keysToDelete := allKeys()
deleteOPs := make([]clientv3.Op, 0, len(keysToDelete)+1)
for _, key := range keysToDelete {
- deleteOPs = append(deleteOPs,
clientv3.OpDelete(listPrefixesForEntity(group, key), clientv3.WithPrefix()))
+ deleteOPs = append(deleteOPs,
clientv3.OpDelete(e.prependNamespace(listPrefixesForEntity(group, key)),
clientv3.WithPrefix()))
}
- deleteOPs = append(deleteOPs, clientv3.OpDelete(formatGroupKey(group),
clientv3.WithPrefix()))
+ deleteOPs = append(deleteOPs,
clientv3.OpDelete(e.prependNamespace(formatGroupKey(group)),
clientv3.WithPrefix()))
txnResponse, err := e.client.Txn(ctx).Then(deleteOPs...).Commit()
if err != nil {
return false, err
@@ -109,5 +96,5 @@ func (e *etcdSchemaRegistry) UpdateGroup(ctx
context.Context, group *commonv1.Gr
}
func formatGroupKey(group string) string {
- return groupsKeyPrefix + group + groupMetadataKey
+ return path.Join(groupsKeyPrefix, group)
}
diff --git a/banyand/metadata/schema/node.go b/banyand/metadata/schema/node.go
index c383c134..575b8c89 100644
--- a/banyand/metadata/schema/node.go
+++ b/banyand/metadata/schema/node.go
@@ -19,6 +19,7 @@ package schema
import (
"context"
+ "path"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
)
@@ -57,5 +58,5 @@ func (e *etcdSchemaRegistry) RegisterNode(ctx
context.Context, node *databasev1.
}
func formatNodeKey(name string) string {
- return nodeKeyPrefix + name
+ return path.Join(nodeKeyPrefix, name)
}
diff --git a/banyand/metadata/schema/property.go
b/banyand/metadata/schema/property.go
index 7d9b1c5d..bb76f0a8 100644
--- a/banyand/metadata/schema/property.go
+++ b/banyand/metadata/schema/property.go
@@ -19,6 +19,7 @@ package schema
import (
"context"
+ "path"
"github.com/pkg/errors"
"google.golang.org/protobuf/types/known/timestamppb"
@@ -62,7 +63,7 @@ func (e *etcdSchemaRegistry) ListProperty(ctx
context.Context, container *common
if container.Group == "" {
return nil, BadRequest("container.group", "group should not be
empty")
}
- messages, err := e.listWithPrefix(ctx,
listPrefixesForEntity(container.Group+"/"+container.Name, propertyKeyPrefix),
KindProperty)
+ messages, err := e.listWithPrefix(ctx,
listPrefixesForEntity(path.Join(container.Group, container.Name),
propertyKeyPrefix), KindProperty)
if err != nil {
return nil, err
}
@@ -165,7 +166,7 @@ func (e *etcdSchemaRegistry) DeleteProperty(ctx
context.Context, metadata *prope
func transformKey(metadata *propertyv1.Metadata) *commonv1.Metadata {
return &commonv1.Metadata{
Group: metadata.Container.GetGroup(),
- Name: metadata.Container.Name + "/" + metadata.Id,
+ Name: path.Join(metadata.Container.Name, metadata.Id),
}
}
diff --git a/banyand/metadata/schema/register_test.go
b/banyand/metadata/schema/register_test.go
index 28e7162a..9f05a999 100644
--- a/banyand/metadata/schema/register_test.go
+++ b/banyand/metadata/schema/register_test.go
@@ -58,7 +58,10 @@ var _ = ginkgo.Describe("etcd_register", func() {
embeddedetcd.RootDir(randomTempDir()))
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
<-server.ReadyNotify()
- schemaRegistry, err :=
NewEtcdSchemaRegistry(ConfigureServerEndpoints(endpoints))
+ schemaRegistry, err := NewEtcdSchemaRegistry(
+ Namespace("test"),
+ ConfigureServerEndpoints(endpoints),
+ )
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
r = schemaRegistry.(*etcdSchemaRegistry)
})
@@ -74,7 +77,9 @@ var _ = ginkgo.Describe("etcd_register", func() {
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
gomega.Expect(r.get(context.Background(), k,
&databasev1.Node{})).ShouldNot(gomega.HaveOccurred())
gomega.Expect(r.Close()).ShouldNot(gomega.HaveOccurred())
- schemaRegistry, err :=
NewEtcdSchemaRegistry(ConfigureServerEndpoints(endpoints))
+ schemaRegistry, err := NewEtcdSchemaRegistry(
+ Namespace("test"),
+ ConfigureServerEndpoints(endpoints))
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
r = schemaRegistry.(*etcdSchemaRegistry)
gomega.Expect(r.get(context.Background(), k,
&databasev1.Node{})).Should(gomega.MatchError(ErrGRPCResourceNotFound))
diff --git a/banyand/metadata/schema/schema_suite_test.go
b/banyand/metadata/schema/schema_suite_test.go
index 383a4f54..2173b3ee 100644
--- a/banyand/metadata/schema/schema_suite_test.go
+++ b/banyand/metadata/schema/schema_suite_test.go
@@ -18,50 +18,13 @@
package schema
import (
- "fmt"
"testing"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
-
- "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd"
- "github.com/apache/skywalking-banyandb/pkg/logger"
- "github.com/apache/skywalking-banyandb/pkg/test"
- "github.com/apache/skywalking-banyandb/pkg/test/flags"
)
func TestSchema(t *testing.T) {
gomega.RegisterFailHandler(ginkgo.Fail)
ginkgo.RunSpecs(t, "Schema Suite")
}
-
-var (
- server embeddedetcd.Server
- registry *etcdSchemaRegistry
-)
-
-var _ = ginkgo.BeforeSuite(func() {
- gomega.Expect(logger.Init(logger.Logging{
- Env: "dev",
- Level: flags.LogLevel,
- })).To(gomega.Succeed())
- ports, err := test.AllocateFreePorts(2)
- if err != nil {
- panic("fail to find free ports")
- }
- endpoints := []string{fmt.Sprintf("http://127.0.0.1:%d", ports[0])}
- server, err = embeddedetcd.NewServer(
- embeddedetcd.ConfigureListener(endpoints,
[]string{fmt.Sprintf("http://127.0.0.1:%d", ports[1])}),
- embeddedetcd.RootDir(randomTempDir()))
- gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
- <-server.ReadyNotify()
- schemaRegistry, err :=
NewEtcdSchemaRegistry(ConfigureServerEndpoints(endpoints))
- gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
- registry = schemaRegistry.(*etcdSchemaRegistry)
-})
-
-var _ = ginkgo.AfterSuite(func() {
- registry.Close()
- server.Close()
- <-server.StopNotify()
-})
diff --git a/banyand/metadata/schema/watcher.go
b/banyand/metadata/schema/watcher.go
index 3a93828c..bd9b2b8f 100644
--- a/banyand/metadata/schema/watcher.go
+++ b/banyand/metadata/schema/watcher.go
@@ -27,6 +27,12 @@ import (
"github.com/apache/skywalking-banyandb/pkg/run"
)
+type watcherConfig struct {
+ handler EventHandler
+ key string
+ kind Kind
+}
+
type watcher struct {
handler EventHandler
cli *clientv3.Client
@@ -36,12 +42,12 @@ type watcher struct {
kind Kind
}
-func newWatcher(cli *clientv3.Client, kind Kind, handler EventHandler, l
*logger.Logger) *watcher {
+func newWatcher(cli *clientv3.Client, wc watcherConfig, l *logger.Logger)
*watcher {
w := &watcher{
cli: cli,
- key: kind.key(),
- kind: kind,
- handler: handler,
+ key: wc.key,
+ kind: wc.kind,
+ handler: wc.handler,
closer: run.NewCloser(1),
l: l,
}
diff --git a/banyand/metadata/schema/watcher_test.go
b/banyand/metadata/schema/watcher_test.go
index 04d22194..a796c4bb 100644
--- a/banyand/metadata/schema/watcher_test.go
+++ b/banyand/metadata/schema/watcher_test.go
@@ -28,6 +28,9 @@ import (
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/test"
"github.com/apache/skywalking-banyandb/pkg/test/flags"
)
@@ -74,10 +77,37 @@ var _ = ginkgo.Describe("Watcher", func() {
var (
mockedObj *mockedHandler
watcher *watcher
+ server embeddedetcd.Server
+ registry *etcdSchemaRegistry
)
ginkgo.BeforeEach(func() {
mockedObj = newMockedHandler()
+ gomega.Expect(logger.Init(logger.Logging{
+ Env: "dev",
+ Level: flags.LogLevel,
+ })).To(gomega.Succeed())
+ ports, err := test.AllocateFreePorts(2)
+ if err != nil {
+ panic("fail to find free ports")
+ }
+ endpoints := []string{fmt.Sprintf("http://127.0.0.1:%d",
ports[0])}
+ server, err = embeddedetcd.NewServer(
+ embeddedetcd.ConfigureListener(endpoints,
[]string{fmt.Sprintf("http://127.0.0.1:%d", ports[1])}),
+ embeddedetcd.RootDir(randomTempDir()))
+ gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
+ <-server.ReadyNotify()
+ schemaRegistry, err := NewEtcdSchemaRegistry(
+ Namespace("test"),
+ ConfigureServerEndpoints(endpoints),
+ )
+ gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
+ registry = schemaRegistry.(*etcdSchemaRegistry)
+ })
+ ginkgo.AfterEach(func() {
+ registry.Close()
+ server.Close()
+ <-server.StopNotify()
})
ginkgo.It("should handle all existing key-value pairs on initial load",
func() {
diff --git a/banyand/observability/pprof.go b/banyand/observability/pprof.go
index 46cbb0da..b5af797c 100644
--- a/banyand/observability/pprof.go
+++ b/banyand/observability/pprof.go
@@ -20,6 +20,7 @@ package observability
import (
"net/http"
"net/http/pprof"
+ "sync"
"time"
"github.com/apache/skywalking-banyandb/pkg/logger"
@@ -43,6 +44,7 @@ type pprofService struct {
svr *http.Server
closer *run.Closer
listenAddr string
+ svrMux sync.Mutex
}
func (p *pprofService) FlagSet() *run.FlagSet {
@@ -70,6 +72,8 @@ func (p *pprofService) Serve() run.StopNotify {
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
+ p.svrMux.Lock()
+ defer p.svrMux.Unlock()
p.svr = &http.Server{
Addr: p.listenAddr,
ReadHeaderTimeout: 3 * time.Second,
@@ -84,6 +88,10 @@ func (p *pprofService) Serve() run.StopNotify {
}
func (p *pprofService) GracefulStop() {
- _ = p.svr.Close()
+ p.svrMux.Lock()
+ defer p.svrMux.Unlock()
+ if p.svr != nil {
+ _ = p.svr.Close()
+ }
p.closer.CloseThenWait()
}
diff --git a/banyand/observability/service.go b/banyand/observability/service.go
index 412dc8e1..00272799 100644
--- a/banyand/observability/service.go
+++ b/banyand/observability/service.go
@@ -20,6 +20,7 @@ package observability
import (
"context"
"net/http"
+ "sync"
"time"
"github.com/robfig/cron/v3"
@@ -48,6 +49,7 @@ type metricService struct {
closer *run.Closer
scheduler *timestamp.Scheduler
listenAddr string
+ mutex sync.Mutex
}
func (p *metricService) FlagSet() *run.FlagSet {
@@ -70,6 +72,8 @@ func (p *metricService) Name() string {
func (p *metricService) Serve() run.StopNotify {
p.l = logger.GetLogger(p.Name())
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
clock, _ := timestamp.GetClock(context.TODO())
p.scheduler = timestamp.NewScheduler(p.l, clock)
err := p.scheduler.Register("metrics-collector", cron.Descriptor,
"@every 15s", func(now time.Time, logger *logger.Logger) bool {
@@ -93,6 +97,8 @@ func (p *metricService) Serve() run.StopNotify {
}
func (p *metricService) GracefulStop() {
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
if p.scheduler != nil {
p.scheduler.Close()
}
diff --git a/banyand/queue/local.go b/banyand/queue/local.go
index 09ca07aa..c5b8dc42 100644
--- a/banyand/queue/local.go
+++ b/banyand/queue/local.go
@@ -80,6 +80,10 @@ func (l local) NewBatchPublisher() BatchPublisher {
}
}
+func (*local) GetPort() *uint32 {
+ return nil
+}
+
type localBatchPublisher struct {
local *bus.Bus
}
diff --git a/banyand/queue/pub/client.go b/banyand/queue/pub/client.go
index 8fb06418..c5c68139 100644
--- a/banyand/queue/pub/client.go
+++ b/banyand/queue/pub/client.go
@@ -53,6 +53,17 @@ func (p *pub) OnAddOrUpdate(md schema.Metadata) {
p.log.Warn().Msg("failed to cast node spec")
return
}
+ var hasDataRole bool
+ for _, r := range node.Roles {
+ if r == databasev1.Role_ROLE_DATA {
+ hasDataRole = true
+ break
+ }
+ }
+ if !hasDataRole {
+ return
+ }
+
address := node.GrpcAddress
if address == "" {
p.log.Warn().Stringer("node", node).Msg("grpc address is empty")
@@ -70,7 +81,7 @@ func (p *pub) OnAddOrUpdate(md schema.Metadata) {
if _, ok := p.clients[name]; ok {
return
}
- conn, err := grpc.Dial(address,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(retryPolicy), grpc.WithBlock())
+ conn, err := grpc.Dial(address,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(retryPolicy))
if err != nil {
p.log.Error().Err(err).Msg("failed to connect to grpc server")
return
diff --git a/banyand/queue/queue.go b/banyand/queue/queue.go
index bada6711..7bb8c531 100644
--- a/banyand/queue/queue.go
+++ b/banyand/queue/queue.go
@@ -45,6 +45,7 @@ type Client interface {
type Server interface {
run.Unit
bus.Subscriber
+ GetPort() *uint32
}
// BatchPublisher is the interface for publishing data in batch.
diff --git a/banyand/queue/sub/server.go b/banyand/queue/sub/server.go
index 78833713..2b880f9f 100644
--- a/banyand/queue/sub/server.go
+++ b/banyand/queue/sub/server.go
@@ -73,7 +73,9 @@ type server struct {
// NewServer returns a new gRPC server.
func NewServer() queue.Server {
- return &server{}
+ return &server{
+ listeners: make(map[bus.Topic]bus.MessageListener),
+ }
}
func (s *server) PreRun(_ context.Context) error {
diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go
index 7a9b34df..0ce3e6ce 100644
--- a/banyand/stream/metadata.go
+++ b/banyand/stream/metadata.go
@@ -38,6 +38,12 @@ import (
var _ Query = (*schemaRepo)(nil)
+// SchemaService allows querying stream schema.
+type SchemaService interface {
+ Query
+ Close()
+}
+
type schemaRepo struct {
resourceSchema.Repository
l *logger.Logger
@@ -61,7 +67,7 @@ func newSchemaRepo(path string, metadata metadata.Repo,
}
// NewPortableRepository creates a new portable repository.
-func NewPortableRepository(metadata metadata.Repo, l *logger.Logger) Query {
+func NewPortableRepository(metadata metadata.Repo, l *logger.Logger)
SchemaService {
r := &schemaRepo{
l: l,
metadata: metadata,
diff --git a/banyand/internal/cmd/storage.go b/pkg/cmdsetup/data.go
similarity index 67%
rename from banyand/internal/cmd/storage.go
rename to pkg/cmdsetup/data.go
index ceadaa17..7a421b12 100644
--- a/banyand/internal/cmd/storage.go
+++ b/pkg/cmdsetup/data.go
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package cmd
+package cmdsetup
import (
"context"
@@ -33,21 +33,10 @@ import (
"github.com/apache/skywalking-banyandb/pkg/config"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
- "github.com/apache/skywalking-banyandb/pkg/signal"
"github.com/apache/skywalking-banyandb/pkg/version"
)
-var storageGroup = run.NewGroup("storage")
-
-const (
- storageModeData = "data"
- storageModeQuery = "query"
- storageModeMix = "mix"
-)
-
-var flagStorageMode string
-
-func newStorageCmd() *cobra.Command {
+func newDataCmd(runners ...run.Unit) *cobra.Command {
l := logger.GetLogger("bootstrap")
ctx := context.Background()
metaSvc, err := metadata.NewClient(ctx)
@@ -71,59 +60,46 @@ func newStorageCmd() *cobra.Command {
profSvc := observability.NewProfService()
metricSvc := observability.NewMetricService()
- units := []run.Unit{
- new(signal.Handler),
+ var units []run.Unit
+ units = append(units, runners...)
+ units = append(units,
+ metaSvc,
pipeline,
measureSvc,
streamSvc,
q,
profSvc,
- }
+ )
if metricSvc != nil {
units = append(units, metricSvc)
}
- // Meta the run Group units.
- storageGroup.Register(units...)
+ dataGroup := run.NewGroup("data")
+ dataGroup.Register(units...)
logging := logger.Logging{}
- storageCmd := &cobra.Command{
- Use: "storage",
+ dataCmd := &cobra.Command{
+ Use: "data",
Version: version.Build(),
- Short: "Run as the storage server",
+ Short: "Run as the data server",
PersistentPreRunE: func(cmd *cobra.Command, args []string) (err
error) {
if err = config.Load("logging", cmd.Flags()); err !=
nil {
return err
}
return logger.Init(logging)
},
- PreRun: func(cmd *cobra.Command, args []string) {
- if flagStorageMode == storageModeMix {
- return
- }
- switch flagStorageMode {
- case storageModeData:
- storageGroup.Deregister(q)
- case storageModeQuery:
- storageGroup.Deregister(streamSvc)
- storageGroup.Deregister(measureSvc)
- default:
- l.Fatal().Str("mode",
flagStorageMode).Msg("unknown storage mode")
- }
- },
RunE: func(cmd *cobra.Command, args []string) (err error) {
- node, err := common.GenerateNode(nil, nil)
+ node, err := common.GenerateNode(pipeline.GetPort(),
nil)
if err != nil {
return err
}
- logger.GetLogger().Info().Msg("starting as a storage
server")
+ logger.GetLogger().Info().Msg("starting as a data
server")
// Spawn our go routines and wait for shutdown.
- if err :=
storageGroup.Run(context.WithValue(context.Background(), common.ContextNodeKey,
node)); err != nil {
-
logger.GetLogger().Error().Err(err).Stack().Str("name",
storageGroup.Name()).Msg("Exit")
+ if err :=
dataGroup.Run(context.WithValue(context.Background(), common.ContextNodeKey,
node)); err != nil {
+
logger.GetLogger().Error().Err(err).Stack().Str("name",
dataGroup.Name()).Msg("Exit")
os.Exit(-1)
}
return nil
},
}
- storageCmd.Flags().StringVarP(&flagStorageMode, "mode", "m",
storageModeMix, "the storage mode, one of [data, query, mix]")
- storageCmd.Flags().AddFlagSet(storageGroup.RegisterFlags().FlagSet)
- return storageCmd
+ dataCmd.Flags().AddFlagSet(dataGroup.RegisterFlags().FlagSet)
+ return dataCmd
}
diff --git a/banyand/internal/cmd/liaison.go b/pkg/cmdsetup/liaison.go
similarity index 92%
rename from banyand/internal/cmd/liaison.go
rename to pkg/cmdsetup/liaison.go
index d09428f0..3ef2eb44 100644
--- a/banyand/internal/cmd/liaison.go
+++ b/pkg/cmdsetup/liaison.go
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package cmd
+package cmdsetup
import (
"context"
@@ -32,13 +32,10 @@ import (
"github.com/apache/skywalking-banyandb/banyand/queue/pub"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
- "github.com/apache/skywalking-banyandb/pkg/signal"
"github.com/apache/skywalking-banyandb/pkg/version"
)
-var liaisonGroup = run.NewGroup("liaison")
-
-func newLiaisonCmd() *cobra.Command {
+func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
l := logger.GetLogger("bootstrap")
ctx := context.Background()
metaSvc, err := metadata.NewClient(ctx)
@@ -54,19 +51,20 @@ func newLiaisonCmd() *cobra.Command {
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate distributed query
service")
}
-
- units := []run.Unit{
- new(signal.Handler),
+ var units []run.Unit
+ units = append(units, runners...)
+ units = append(units,
+ metaSvc,
pipeline,
dQuery,
grpcServer,
httpServer,
profSvc,
- }
+ )
if metricSvc != nil {
units = append(units, metricSvc)
}
- // Meta the run Group units.
+ liaisonGroup := run.NewGroup("liaison")
liaisonGroup.Register(units...)
liaisonCmd := &cobra.Command{
Use: "liaison",
diff --git a/banyand/internal/cmd/root.go b/pkg/cmdsetup/root.go
similarity index 92%
rename from banyand/internal/cmd/root.go
rename to pkg/cmdsetup/root.go
index b1535a2b..6b7df86a 100644
--- a/banyand/internal/cmd/root.go
+++ b/pkg/cmdsetup/root.go
@@ -15,8 +15,8 @@
// specific language governing permissions and limitations
// under the License.
-// Package cmd is an internal package defining cli commands for BanyanDB.
-package cmd
+// Package cmdsetup implements a real env in which to run tests.
+package cmdsetup
import (
"fmt"
@@ -26,6 +26,7 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/pkg/config"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/version"
)
@@ -39,7 +40,7 @@ const logo = `
`
// NewRoot returns a root command.
-func NewRoot() *cobra.Command {
+func NewRoot(runners ...run.Unit) *cobra.Command {
logging := logger.Logging{}
cmd := &cobra.Command{
DisableAutoGenTag: true,
@@ -63,9 +64,9 @@ BanyanDB, as an observability database, aims to ingest,
analyze and store Metric
cmd.PersistentFlags().StringVar(&logging.Level, "logging-level",
"info", "the root level of logging")
cmd.PersistentFlags().StringArrayVar(&logging.Modules,
"logging-modules", nil, "the specific module")
cmd.PersistentFlags().StringArrayVar(&logging.Levels, "logging-levels",
nil, "the level logging of logging")
- cmd.AddCommand(newStandaloneCmd())
- cmd.AddCommand(newStorageCmd())
- cmd.AddCommand(newLiaisonCmd())
+ cmd.AddCommand(newStandaloneCmd(runners...))
+ cmd.AddCommand(newDataCmd(runners...))
+ cmd.AddCommand(newLiaisonCmd(runners...))
return cmd
}
diff --git a/banyand/internal/cmd/standalone.go b/pkg/cmdsetup/standalone.go
similarity index 94%
rename from banyand/internal/cmd/standalone.go
rename to pkg/cmdsetup/standalone.go
index 8686db07..f0dca2f2 100644
--- a/banyand/internal/cmd/standalone.go
+++ b/pkg/cmdsetup/standalone.go
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package cmd
+package cmdsetup
import (
"context"
@@ -34,13 +34,10 @@ import (
"github.com/apache/skywalking-banyandb/banyand/stream"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
- "github.com/apache/skywalking-banyandb/pkg/signal"
"github.com/apache/skywalking-banyandb/pkg/version"
)
-var standaloneGroup = run.NewGroup("standalone")
-
-func newStandaloneCmd() *cobra.Command {
+func newStandaloneCmd(runners ...run.Unit) *cobra.Command {
l := logger.GetLogger("bootstrap")
ctx := context.Background()
pipeline := queue.Local()
@@ -65,8 +62,9 @@ func newStandaloneCmd() *cobra.Command {
metricSvc := observability.NewMetricService()
httpServer := http.NewServer()
- units := []run.Unit{
- new(signal.Handler),
+ var units []run.Unit
+ units = append(units, runners...)
+ units = append(units,
pipeline,
metaSvc,
measureSvc,
@@ -75,10 +73,11 @@ func newStandaloneCmd() *cobra.Command {
grpcServer,
httpServer,
profSvc,
- }
+ )
if metricSvc != nil {
units = append(units, metricSvc)
}
+ standaloneGroup := run.NewGroup("standalone")
// Meta the run Group units.
standaloneGroup.Register(units...)
diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go
index ee8f8561..9743503f 100644
--- a/pkg/logger/logger.go
+++ b/pkg/logger/logger.go
@@ -77,7 +77,7 @@ func (l *Logger) Named(name ...string) *Logger {
level = ml
}
}
- subLogger := root.l.With().Str("module",
moduleBuilder.String()).Logger().Level(level)
+ subLogger := root.get().With().Str("module",
moduleBuilder.String()).Logger().Level(level)
return &Logger{module: module, modules: l.modules, development:
l.development, Logger: &subLogger}
}
diff --git a/pkg/logger/setting.go b/pkg/logger/setting.go
index 46a75ee8..81b401d6 100644
--- a/pkg/logger/setting.go
+++ b/pkg/logger/setting.go
@@ -74,13 +74,19 @@ func (rl *rootLogger) set(cfg Logging) error {
return nil
}
+func (rl *rootLogger) get() *Logger {
+ rl.m.Lock()
+ defer rl.m.Unlock()
+ return rl.l
+}
+
// GetLogger return logger with a scope.
func GetLogger(scope ...string) *Logger {
root.verify()
if len(scope) < 1 {
return root.l
}
- l := root.l
+ l := root.get()
for _, v := range scope {
l = l.Named(v)
}
diff --git a/pkg/test/helpers/etcd.go b/pkg/test/helpers/etcd.go
new file mode 100644
index 00000000..1c0320bc
--- /dev/null
+++ b/pkg/test/helpers/etcd.go
@@ -0,0 +1,66 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package helpers
+
+import (
+ "context"
+ "log"
+ "time"
+
+ clientv3 "go.etcd.io/etcd/client/v3"
+
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+)
+
+var defaultDialTimeout = 5 * time.Second
+
+// ListKeys lists all keys under the given prefix.
+func ListKeys(serverAddress string, prefix string)
(map[string]*databasev1.Node, error) {
+ client, err := clientv3.New(clientv3.Config{
+ Endpoints: []string{serverAddress},
+ DialTimeout: defaultDialTimeout,
+ })
+ if err != nil {
+ log.Fatalf("Failed to create etcd client: %v", err)
+ }
+ defer client.Close()
+
+ ctx, cancel := context.WithTimeout(context.Background(),
defaultDialTimeout)
+ defer cancel()
+
+ resp, err := client.Get(ctx, prefix, clientv3.WithPrefix())
+ if err != nil {
+ return nil, err
+ }
+
+ if resp.Count == 0 {
+ return nil, nil
+ }
+
+ nodeMap := make(map[string]*databasev1.Node)
+ for _, kv := range resp.Kvs {
+ md, err := schema.KindNode.Unmarshal(kv)
+ if err != nil {
+ return nil, err
+ }
+ nodeMap[string(kv.Key)] = md.Spec.(*databasev1.Node)
+ }
+
+ return nodeMap, nil
+}
diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go
index e7fb94c7..622f897b 100644
--- a/pkg/test/setup/setup.go
+++ b/pkg/test/setup/setup.go
@@ -21,6 +21,7 @@ package setup
import (
"context"
"fmt"
+ "sync"
"github.com/onsi/gomega"
@@ -31,6 +32,7 @@ import (
"github.com/apache/skywalking-banyandb/banyand/query"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/banyand/stream"
+ "github.com/apache/skywalking-banyandb/pkg/cmdsetup"
"github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/test"
test_measure "github.com/apache/skywalking-banyandb/pkg/test/measure"
@@ -141,3 +143,22 @@ func (p *preloadService) PreRun(ctx context.Context) error
{
func (p *preloadService) SetMeta(meta metadata.Service) {
p.metaSvc = meta
}
+
+// CMD runs the command with given flags.
+func CMD(flags ...string) func() {
+ closer, closeFn := run.NewTester("closer")
+ rootCmd := cmdsetup.NewRoot(closer)
+ rootCmd.SetArgs(flags)
+ wg := sync.WaitGroup{}
+ wg.Add(1)
+ go func() {
+ defer func() {
+ wg.Done()
+ }()
+
gomega.Expect(rootCmd.Execute()).ShouldNot(gomega.HaveOccurred())
+ }()
+ return func() {
+ closeFn()
+ wg.Wait()
+ }
+}
diff --git a/pkg/test/stream/etcd.go b/pkg/test/stream/etcd.go
index 85c25d6f..e7557c31 100644
--- a/pkg/test/stream/etcd.go
+++ b/pkg/test/stream/etcd.go
@@ -21,6 +21,7 @@ package stream
import (
"context"
"embed"
+ "path"
"google.golang.org/protobuf/encoding/protojson"
@@ -73,7 +74,7 @@ func PreloadSchema(ctx context.Context, e schema.Registry)
error {
return err
}
for _, entry := range entries {
- data, err := indexRuleStore.ReadFile(indexRuleDir + "/" +
entry.Name())
+ data, err := indexRuleStore.ReadFile(path.Join(indexRuleDir,
entry.Name()))
if err != nil {
return err
}
diff --git a/scripts/build/lint.mk b/scripts/build/lint.mk
index 0899dfc4..394161c0 100644
--- a/scripts/build/lint.mk
+++ b/scripts/build/lint.mk
@@ -29,4 +29,4 @@ lint: $(LINTER) $(REVIVE) ## Run all linters
.PHONY: format
format: $(LINTER)
- $(LINTER) run --fix -c $(root_dir)/.golangci-format.yml
+ $(LINTER) run --fix -c $(root_dir)/.golangci-format.yml ./...
diff --git a/test/integration/distributed/setup/node_test.go
b/test/integration/distributed/setup/node_test.go
new file mode 100644
index 00000000..2d48b784
--- /dev/null
+++ b/test/integration/distributed/setup/node_test.go
@@ -0,0 +1,89 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package integration_setup_test
+
+import (
+ "fmt"
+ "time"
+
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/pkg/test"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
+ "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+ "github.com/apache/skywalking-banyandb/pkg/test/setup"
+)
+
+const host = "127.0.0.1"
+
+var _ = Describe("Node registration", func() {
+ It("should register/unregister a liaison node successfully", func() {
+ namespace := "liaison-test"
+ nodeHost := "liaison-1"
+ ports, err := test.AllocateFreePorts(2)
+ Expect(err).NotTo(HaveOccurred())
+ addr := fmt.Sprintf("%s:%d", host, ports[0])
+ httpAddr := fmt.Sprintf("%s:%d", host, ports[1])
+ closeFn := setup.CMD("liaison",
+ "--namespace", namespace,
+ "--grpc-host="+host,
+ fmt.Sprintf("--grpc-port=%d", ports[0]),
+ "--http-host="+host,
+ fmt.Sprintf("--http-port=%d", ports[1]),
+ "--http-grpc-addr="+addr,
+ "--etcd-endpoints", etcdEndpoint,
+ "--node-host-provider", "flag",
+ "--node-host", nodeHost)
+ Eventually(helpers.HTTPHealthCheck(httpAddr),
flags.EventuallyTimeout).Should(Succeed())
+ Eventually(func() (map[string]*databasev1.Node, error) {
+ return helpers.ListKeys(etcdEndpoint,
fmt.Sprintf("/%s/nodes/%s:%d", namespace, nodeHost, ports[0]))
+ }, flags.EventuallyTimeout).Should(HaveLen(1))
+ closeFn()
+ Eventually(func() (map[string]*databasev1.Node, error) {
+ return helpers.ListKeys(etcdEndpoint,
fmt.Sprintf("/%s/nodes/%s:%d", namespace, nodeHost, ports[0]))
+ }, flags.EventuallyTimeout).Should(BeNil())
+ })
+ It("should register/unregister a data node successfully", func() {
+ namespace := "data-test"
+ nodeHost := "data-1"
+ ports, err := test.AllocateFreePorts(1)
+ Expect(err).NotTo(HaveOccurred())
+ addr := fmt.Sprintf("%s:%d", host, ports[0])
+ closeFn := setup.CMD("data",
+ "--namespace", namespace,
+ "--grpc-host="+host,
+ fmt.Sprintf("--grpc-port=%d", ports[0]),
+ "--etcd-endpoints", etcdEndpoint,
+ "--node-host-provider", "flag",
+ "--node-host", nodeHost)
+ Eventually(
+ helpers.HealthCheck(addr, 10*time.Second,
10*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials())),
+ flags.EventuallyTimeout).Should(Succeed())
+ Eventually(func() (map[string]*databasev1.Node, error) {
+ return helpers.ListKeys(etcdEndpoint,
fmt.Sprintf("/%s/nodes/%s:%d", namespace, nodeHost, ports[0]))
+ }, flags.EventuallyTimeout).Should(HaveLen(1))
+ closeFn()
+ Eventually(func() (map[string]*databasev1.Node, error) {
+ return helpers.ListKeys(etcdEndpoint,
fmt.Sprintf("/%s/nodes/%s:%d", namespace, nodeHost, ports[0]))
+ }, flags.EventuallyTimeout).Should(BeNil())
+ })
+})
diff --git a/banyand/metadata/schema/schema_suite_test.go
b/test/integration/distributed/setup/setup_suite_test.go
similarity index 51%
copy from banyand/metadata/schema/schema_suite_test.go
copy to test/integration/distributed/setup/setup_suite_test.go
index 383a4f54..3a51ab8c 100644
--- a/banyand/metadata/schema/schema_suite_test.go
+++ b/test/integration/distributed/setup/setup_suite_test.go
@@ -15,14 +15,16 @@
// specific language governing permissions and limitations
// under the License.
-package schema
+// Package integration_setup_test is a integration test suite.
+package integration_setup_test
import (
"fmt"
"testing"
- "github.com/onsi/ginkgo/v2"
- "github.com/onsi/gomega"
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+ "github.com/onsi/gomega/gleak"
"github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd"
"github.com/apache/skywalking-banyandb/pkg/logger"
@@ -30,38 +32,44 @@ import (
"github.com/apache/skywalking-banyandb/pkg/test/flags"
)
-func TestSchema(t *testing.T) {
- gomega.RegisterFailHandler(ginkgo.Fail)
- ginkgo.RunSpecs(t, "Schema Suite")
+func TestSetup(t *testing.T) {
+ RegisterFailHandler(Fail)
+ RunSpecs(t, "Setup Suite")
}
var (
- server embeddedetcd.Server
- registry *etcdSchemaRegistry
+ deferFunc func()
+ goods []gleak.Goroutine
+ etcdEndpoint string
)
-var _ = ginkgo.BeforeSuite(func() {
- gomega.Expect(logger.Init(logger.Logging{
+var _ = SynchronizedBeforeSuite(func() []byte {
+ Expect(logger.Init(logger.Logging{
Env: "dev",
Level: flags.LogLevel,
- })).To(gomega.Succeed())
+ })).To(Succeed())
+ goods = gleak.Goroutines()
ports, err := test.AllocateFreePorts(2)
- if err != nil {
- panic("fail to find free ports")
- }
- endpoints := []string{fmt.Sprintf("http://127.0.0.1:%d", ports[0])}
- server, err = embeddedetcd.NewServer(
- embeddedetcd.ConfigureListener(endpoints,
[]string{fmt.Sprintf("http://127.0.0.1:%d", ports[1])}),
- embeddedetcd.RootDir(randomTempDir()))
- gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
+ Expect(err).NotTo(HaveOccurred())
+ dir, spaceDef, err := test.NewSpace()
+ Expect(err).NotTo(HaveOccurred())
+ ep := fmt.Sprintf("http://127.0.0.1:%d", ports[0])
+ server, err := embeddedetcd.NewServer(
+ embeddedetcd.ConfigureListener([]string{ep},
[]string{fmt.Sprintf("http://127.0.0.1:%d", ports[1])}),
+ embeddedetcd.RootDir(dir))
+ Expect(err).ShouldNot(HaveOccurred())
<-server.ReadyNotify()
- schemaRegistry, err :=
NewEtcdSchemaRegistry(ConfigureServerEndpoints(endpoints))
- gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
- registry = schemaRegistry.(*etcdSchemaRegistry)
+ deferFunc = func() {
+ _ = server.Close()
+ <-server.StopNotify()
+ spaceDef()
+ }
+ return []byte(ep)
+}, func(ep []byte) {
+ etcdEndpoint = string(ep)
})
-var _ = ginkgo.AfterSuite(func() {
- registry.Close()
- server.Close()
- <-server.StopNotify()
+var _ = SynchronizedAfterSuite(func() {}, func() {
+ deferFunc()
+ Eventually(gleak.Goroutines,
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
})
diff --git a/test/integration/cold_query/query_suite_test.go
b/test/integration/standalone/cold_query/query_suite_test.go
similarity index 96%
rename from test/integration/cold_query/query_suite_test.go
rename to test/integration/standalone/cold_query/query_suite_test.go
index 9f3c16df..e595e1a5 100644
--- a/test/integration/cold_query/query_suite_test.go
+++ b/test/integration/standalone/cold_query/query_suite_test.go
@@ -38,11 +38,12 @@ import (
casesstream "github.com/apache/skywalking-banyandb/test/cases/stream"
casesstreamdata
"github.com/apache/skywalking-banyandb/test/cases/stream/data"
casestopn "github.com/apache/skywalking-banyandb/test/cases/topn"
+ integration_standalone
"github.com/apache/skywalking-banyandb/test/integration/standalone"
)
func TestIntegrationColdQuery(t *testing.T) {
RegisterFailHandler(Fail)
- RunSpecs(t, "Integration Query Cold Data Suite", Label("integration"))
+ RunSpecs(t, "Integration Query Cold Data Suite",
Label(integration_standalone.Labels...))
}
var (
diff --git a/test/integration/other/measure_test.go
b/test/integration/standalone/other/measure_test.go
similarity index 100%
rename from test/integration/other/measure_test.go
rename to test/integration/standalone/other/measure_test.go
diff --git a/test/integration/other/other_suite_test.go
b/test/integration/standalone/other/other_suite_test.go
similarity index 87%
rename from test/integration/other/other_suite_test.go
rename to test/integration/standalone/other/other_suite_test.go
index b073fc86..c86e9d75 100644
--- a/test/integration/other/other_suite_test.go
+++ b/test/integration/standalone/other/other_suite_test.go
@@ -25,11 +25,12 @@ import (
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/test/flags"
+ integration_standalone
"github.com/apache/skywalking-banyandb/test/integration/standalone"
)
func TestIntegrationOther(t *testing.T) {
gm.RegisterFailHandler(g.Fail)
- g.RunSpecs(t, "Integration Other Suite", g.Label("integration"))
+ g.RunSpecs(t, "Integration Other Suite",
g.Label(integration_standalone.Labels...))
}
var _ = g.BeforeSuite(func() {
diff --git a/test/integration/other/property_test.go
b/test/integration/standalone/other/property_test.go
similarity index 100%
rename from test/integration/other/property_test.go
rename to test/integration/standalone/other/property_test.go
diff --git a/test/integration/other/testdata/server_cert.pem
b/test/integration/standalone/other/testdata/server_cert.pem
similarity index 100%
rename from test/integration/other/testdata/server_cert.pem
rename to test/integration/standalone/other/testdata/server_cert.pem
diff --git a/test/integration/other/testdata/server_key.pem
b/test/integration/standalone/other/testdata/server_key.pem
similarity index 100%
rename from test/integration/other/testdata/server_key.pem
rename to test/integration/standalone/other/testdata/server_key.pem
diff --git a/test/integration/other/tls_test.go
b/test/integration/standalone/other/tls_test.go
similarity index 100%
rename from test/integration/other/tls_test.go
rename to test/integration/standalone/other/tls_test.go
diff --git a/test/integration/query/query_suite_test.go
b/test/integration/standalone/query/query_suite_test.go
similarity index 96%
rename from test/integration/query/query_suite_test.go
rename to test/integration/standalone/query/query_suite_test.go
index 0ebed247..b9852cbb 100644
--- a/test/integration/query/query_suite_test.go
+++ b/test/integration/standalone/query/query_suite_test.go
@@ -38,11 +38,12 @@ import (
casesstream "github.com/apache/skywalking-banyandb/test/cases/stream"
casesstreamdata
"github.com/apache/skywalking-banyandb/test/cases/stream/data"
casestopn "github.com/apache/skywalking-banyandb/test/cases/topn"
+ integration_standalone
"github.com/apache/skywalking-banyandb/test/integration/standalone"
)
func TestIntegrationQuery(t *testing.T) {
RegisterFailHandler(Fail)
- RunSpecs(t, "Integration Query Suite", Label("integration"))
+ RunSpecs(t, "Integration Query Suite",
Label(integration_standalone.Labels...))
}
var (
diff --git a/banyand/cmd/server/main.go
b/test/integration/standalone/standalone.go
similarity index 73%
copy from banyand/cmd/server/main.go
copy to test/integration/standalone/standalone.go
index 8906f84d..42f20c7a 100644
--- a/banyand/cmd/server/main.go
+++ b/test/integration/standalone/standalone.go
@@ -15,19 +15,8 @@
// specific language governing permissions and limitations
// under the License.
-// Package main implements the executable banyandb server named banyand.
-package main
+// Package standalone is a standalone integration test suite.
+package standalone
-import (
- "fmt"
- "os"
-
- "github.com/apache/skywalking-banyandb/banyand/internal/cmd"
-)
-
-func main() {
- if err := cmd.NewRoot().Execute(); err != nil {
- _, _ = fmt.Fprintln(os.Stderr, err)
- os.Exit(1)
- }
-}
+// Labels is the labels for standalone integration test suite.
+var Labels = []string{"integration", "standalone"}