This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new a955dad0 Drop discovery module (#311)
a955dad0 is described below
commit a955dad0ff8f5ded48c1479f29c833b714136581
Author: Gao Hongtao <[email protected]>
AuthorDate: Wed Aug 2 14:26:30 2023 +0800
Drop discovery module (#311)
Signed-off-by: Gao Hongtao <[email protected]>
---
CHANGES.md | 1 +
README.md | 1 -
api/event/doc.go | 19 --
api/event/measure.go | 43 -----
api/event/stream.go | 43 -----
api/proto/banyandb/database/v1/database.proto | 7 +-
banyand/discovery/discovery.go | 78 --------
banyand/internal/cmd/liaison.go | 10 +-
banyand/internal/cmd/standalone.go | 14 +-
banyand/internal/cmd/storage.go | 12 +-
banyand/liaison/grpc/discovery.go | 252 ++++++++++++++++++++------
banyand/liaison/grpc/registry_test.go | 9 +-
banyand/liaison/grpc/server.go | 39 +---
banyand/liaison/liaison.go | 5 +-
banyand/measure/measure.go | 7 -
banyand/measure/measure_suite_test.go | 20 +-
banyand/measure/metadata.go | 7 +-
banyand/measure/metadata_test.go | 7 -
banyand/measure/service.go | 27 +--
banyand/metadata/allocator.go | 90 +++++++++
banyand/metadata/client.go | 17 +-
banyand/metadata/metadata.go | 2 +
banyand/metadata/schema/checker.go | 14 ++
banyand/metadata/schema/node.go | 53 ++++++
banyand/metadata/schema/schema.go | 60 +++++-
banyand/metadata/schema/shard.go | 76 ++++++++
banyand/queue/local.go | 2 -
banyand/queue/queue.go | 6 +-
banyand/stream/metadata.go | 7 +-
banyand/stream/metadata_test.go | 7 -
banyand/stream/service.go | 24 +--
banyand/stream/stream.go | 7 -
banyand/stream/stream_suite_test.go | 21 +--
dist/LICENSE | 6 +-
docs/api-reference.md | 1 +
go.mod | 6 +-
go.sum | 12 +-
pkg/schema/metadata.go | 133 ++------------
pkg/test/setup/setup.go | 19 +-
39 files changed, 592 insertions(+), 572 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 338d9c6b..bbdec150 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -22,6 +22,7 @@ Release Notes.
### Chores
- Bump several dependencies and tools.
+- Drop redundant "discovery" module from banyand. "metadata" module is enough
to play the node and shard discovery role.
## 0.4.0
diff --git a/README.md b/README.md
index 83a32c53..73ae9c7c 100644
--- a/README.md
+++ b/README.md
@@ -29,7 +29,6 @@ The database research community usually uses [RUM
conjecture](http://daslab.seas
- [ ] Sharding
- [ ] Load balance
- [ ] Distributed query optimizer
-- [ ] Node discovery
- [ ] Data queue
### Data processor
diff --git a/api/event/doc.go b/api/event/doc.go
deleted file mode 100644
index 8a9656c7..00000000
--- a/api/event/doc.go
+++ /dev/null
@@ -1,19 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-// Package event contains metadata syncing topics.
-package event
diff --git a/api/event/measure.go b/api/event/measure.go
deleted file mode 100644
index 68516d6c..00000000
--- a/api/event/measure.go
+++ /dev/null
@@ -1,43 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package event
-
-import (
- "github.com/apache/skywalking-banyandb/api/common"
- "github.com/apache/skywalking-banyandb/pkg/bus"
-)
-
-var (
- // MeasureShardEventKindVersion is the version tag of measure shard
event kind.
- MeasureShardEventKindVersion = common.KindVersion{
- Version: "v1",
- Kind: "measure-event-shard",
- }
-
- // MeasureTopicShardEvent is the measure shard event publishing topic.
- MeasureTopicShardEvent =
bus.UniTopic(MeasureShardEventKindVersion.String())
-
- // MeasureEntityEventKindVersion is the version tag of measure entity
kind.
- MeasureEntityEventKindVersion = common.KindVersion{
- Version: "v1",
- Kind: "measure-event-entity",
- }
-
- // MeasureTopicEntityEvent is the measure entity event publishing topic.
- MeasureTopicEntityEvent =
bus.UniTopic(MeasureEntityEventKindVersion.String())
-)
diff --git a/api/event/stream.go b/api/event/stream.go
deleted file mode 100644
index e39f01fb..00000000
--- a/api/event/stream.go
+++ /dev/null
@@ -1,43 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package event
-
-import (
- "github.com/apache/skywalking-banyandb/api/common"
- "github.com/apache/skywalking-banyandb/pkg/bus"
-)
-
-var (
- // StreamShardEventKindVersion is the version tag of stream shard
entity kind.
- StreamShardEventKindVersion = common.KindVersion{
- Version: "v1",
- Kind: "stream-event-shard",
- }
-
- // StreamTopicShardEvent is the stream entity event publishing topic.
- StreamTopicShardEvent =
bus.UniTopic(StreamShardEventKindVersion.String())
-
- // StreamEntityEventKindVersion is the version tag of stream entity
kind.
- StreamEntityEventKindVersion = common.KindVersion{
- Version: "v1",
- Kind: "stream-event-entity",
- }
-
- // StreamTopicEntityEvent is the stream entity event publishing topic.
- StreamTopicEntityEvent =
bus.UniTopic(StreamEntityEventKindVersion.String())
-)
diff --git a/api/proto/banyandb/database/v1/database.proto
b/api/proto/banyandb/database/v1/database.proto
index 2e2ac78b..c530b60b 100644
--- a/api/proto/banyandb/database/v1/database.proto
+++ b/api/proto/banyandb/database/v1/database.proto
@@ -27,9 +27,10 @@ option java_package =
"org.apache.skywalking.banyandb.database.v1";
message Node {
string id = 1;
- string addr = 2;
- google.protobuf.Timestamp updated_at = 3;
- google.protobuf.Timestamp created_at = 4;
+ common.v1.Metadata metadata = 2;
+ string addr = 3;
+ google.protobuf.Timestamp updated_at = 4;
+ google.protobuf.Timestamp created_at = 5;
}
message Shard {
diff --git a/banyand/discovery/discovery.go b/banyand/discovery/discovery.go
deleted file mode 100644
index 837ff4ec..00000000
--- a/banyand/discovery/discovery.go
+++ /dev/null
@@ -1,78 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-// Package discovery implements the service discovery.
-package discovery
-
-import (
- "context"
-
- "github.com/apache/skywalking-banyandb/pkg/bus"
- "github.com/apache/skywalking-banyandb/pkg/run"
-)
-
-// ServiceRepo provides service subscripting and publishing.
-//
-//go:generate mockgen -destination=./discovery_mock.go -package=discovery
github.com/apache/skywalking-banyandb/banyand/discovery ServiceRepo
-type ServiceRepo interface {
- NodeID() string
- Name() string
- run.Unit
- bus.Subscriber
- bus.Publisher
- run.Service
-}
-
-type repo struct {
- local *bus.Bus
- stopCh chan struct{}
-}
-
-func (r *repo) NodeID() string {
- return "local"
-}
-
-func (r *repo) Name() string {
- return "service-discovery"
-}
-
-func (r *repo) Subscribe(topic bus.Topic, listener bus.MessageListener) error {
- return r.local.Subscribe(topic, listener)
-}
-
-func (r *repo) Publish(topic bus.Topic, message ...bus.Message) (bus.Future,
error) {
- return r.local.Publish(topic, message...)
-}
-
-// NewServiceRepo return a new ServiceRepo.
-func NewServiceRepo(_ context.Context) (ServiceRepo, error) {
- return &repo{
- local: bus.NewBus(),
- stopCh: make(chan struct{}),
- }, nil
-}
-
-func (r *repo) Serve() run.StopNotify {
- return r.stopCh
-}
-
-func (r *repo) GracefulStop() {
- r.local.Close()
- if r.stopCh != nil {
- close(r.stopCh)
- }
-}
diff --git a/banyand/internal/cmd/liaison.go b/banyand/internal/cmd/liaison.go
index 6c883468..97dff1e7 100644
--- a/banyand/internal/cmd/liaison.go
+++ b/banyand/internal/cmd/liaison.go
@@ -24,7 +24,6 @@ import (
"github.com/spf13/cobra"
- "github.com/apache/skywalking-banyandb/banyand/discovery"
"github.com/apache/skywalking-banyandb/banyand/liaison"
"github.com/apache/skywalking-banyandb/banyand/liaison/http"
"github.com/apache/skywalking-banyandb/banyand/metadata"
@@ -42,12 +41,8 @@ var liaisonGroup = run.NewGroup("liaison")
func newLiaisonCmd() *cobra.Command {
l := logger.GetLogger("bootstrap")
ctx := context.Background()
- repo, err := discovery.NewServiceRepo(ctx)
- if err != nil {
- l.Fatal().Err(err).Msg("failed to initiate service repository")
- }
// nolint: staticcheck
- pipeline, err := queue.NewQueue(ctx, repo)
+ pipeline, err := queue.NewQueue(ctx)
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate data pipeline")
}
@@ -55,7 +50,7 @@ func newLiaisonCmd() *cobra.Command {
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate metadata service")
}
- tcp, err := liaison.NewEndpoint(ctx, pipeline, repo, metaSvc)
+ tcp, err := liaison.NewEndpoint(ctx, pipeline, metaSvc)
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate Endpoint transport
layer")
}
@@ -65,7 +60,6 @@ func newLiaisonCmd() *cobra.Command {
units := []run.Unit{
new(signal.Handler),
- repo,
pipeline,
tcp,
httpServer,
diff --git a/banyand/internal/cmd/standalone.go
b/banyand/internal/cmd/standalone.go
index 027e0407..9d6eccaa 100644
--- a/banyand/internal/cmd/standalone.go
+++ b/banyand/internal/cmd/standalone.go
@@ -24,7 +24,6 @@ import (
"github.com/spf13/cobra"
- "github.com/apache/skywalking-banyandb/banyand/discovery"
"github.com/apache/skywalking-banyandb/banyand/liaison"
"github.com/apache/skywalking-banyandb/banyand/liaison/http"
"github.com/apache/skywalking-banyandb/banyand/measure"
@@ -45,11 +44,7 @@ var standaloneGroup = run.NewGroup("standalone")
func newStandaloneCmd() *cobra.Command {
l := logger.GetLogger("bootstrap")
ctx := context.Background()
- repo, err := discovery.NewServiceRepo(ctx)
- if err != nil {
- l.Fatal().Err(err).Msg("failed to initiate service repository")
- }
- pipeline, err := queue.NewQueue(ctx, repo)
+ pipeline, err := queue.NewQueue(ctx)
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate data pipeline")
}
@@ -57,11 +52,11 @@ func newStandaloneCmd() *cobra.Command {
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate metadata service")
}
- streamSvc, err := stream.NewService(ctx, metaSvc, repo, pipeline)
+ streamSvc, err := stream.NewService(ctx, metaSvc, pipeline)
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate stream service")
}
- measureSvc, err := measure.NewService(ctx, metaSvc, repo, pipeline)
+ measureSvc, err := measure.NewService(ctx, metaSvc, pipeline)
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate measure service")
}
@@ -69,7 +64,7 @@ func newStandaloneCmd() *cobra.Command {
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate query processor")
}
- tcp, err := liaison.NewEndpoint(ctx, pipeline, repo, metaSvc)
+ tcp, err := liaison.NewEndpoint(ctx, pipeline, metaSvc)
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate Endpoint transport
layer")
}
@@ -79,7 +74,6 @@ func newStandaloneCmd() *cobra.Command {
units := []run.Unit{
new(signal.Handler),
- repo,
pipeline,
metaSvc,
measureSvc,
diff --git a/banyand/internal/cmd/storage.go b/banyand/internal/cmd/storage.go
index 80c3311b..7d297fc0 100644
--- a/banyand/internal/cmd/storage.go
+++ b/banyand/internal/cmd/storage.go
@@ -24,7 +24,6 @@ import (
"github.com/spf13/cobra"
- "github.com/apache/skywalking-banyandb/banyand/discovery"
"github.com/apache/skywalking-banyandb/banyand/measure"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/observability"
@@ -51,12 +50,8 @@ var flagStorageMode string
func newStorageCmd() *cobra.Command {
l := logger.GetLogger("bootstrap")
ctx := context.Background()
- repo, err := discovery.NewServiceRepo(ctx)
- if err != nil {
- l.Fatal().Err(err).Msg("failed to initiate service repository")
- }
// nolint: staticcheck
- pipeline, err := queue.NewQueue(ctx, repo)
+ pipeline, err := queue.NewQueue(ctx)
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate data pipeline")
}
@@ -64,11 +59,11 @@ func newStorageCmd() *cobra.Command {
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate metadata service")
}
- streamSvc, err := stream.NewService(ctx, metaSvc, repo, pipeline)
+ streamSvc, err := stream.NewService(ctx, metaSvc, pipeline)
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate stream service")
}
- measureSvc, err := measure.NewService(ctx, metaSvc, repo, pipeline)
+ measureSvc, err := measure.NewService(ctx, metaSvc, pipeline)
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate measure service")
}
@@ -82,7 +77,6 @@ func newStorageCmd() *cobra.Command {
units := []run.Unit{
new(signal.Handler),
- repo,
pipeline,
measureSvc,
streamSvc,
diff --git a/banyand/liaison/grpc/discovery.go
b/banyand/liaison/grpc/discovery.go
index bb9f66e1..a2a38841 100644
--- a/banyand/liaison/grpc/discovery.go
+++ b/banyand/liaison/grpc/discovery.go
@@ -18,7 +18,10 @@
package grpc
import (
+ "context"
+ "fmt"
"sync"
+ "time"
"github.com/pkg/errors"
@@ -26,9 +29,10 @@ import (
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ "github.com/apache/skywalking-banyandb/banyand/metadata"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
- "github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/partition"
)
@@ -36,20 +40,101 @@ import (
var errNotExist = errors.New("the object doesn't exist")
type discoveryService struct {
- shardRepo *shardRepo
- entityRepo *entityRepo
- pipeline queue.Queue
- log *logger.Logger
+ pipeline queue.Queue
+ metadataRepo metadata.Repo
+ shardRepo *shardRepo
+ entityRepo *entityRepo
+ log *logger.Logger
+ kind schema.Kind
}
-func newDiscoveryService(pipeline queue.Queue) *discoveryService {
+func newDiscoveryService(pipeline queue.Queue, kind schema.Kind, metadataRepo
metadata.Repo) *discoveryService {
+ sr := &shardRepo{shardEventsMap: make(map[identity]uint32)}
+ er := &entityRepo{entitiesMap:
make(map[identity]partition.EntityLocator)}
return &discoveryService{
- shardRepo: &shardRepo{shardEventsMap:
make(map[identity]uint32)},
- entityRepo: &entityRepo{entitiesMap:
make(map[identity]partition.EntityLocator)},
- pipeline: pipeline,
+ shardRepo: sr,
+ entityRepo: er,
+ pipeline: pipeline,
+ kind: kind,
+ metadataRepo: metadataRepo,
}
}
+func (ds *discoveryService) initialize() error {
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ groups, err := ds.metadataRepo.GroupRegistry().ListGroup(ctx)
+ cancel()
+ if err != nil {
+ return err
+ }
+ for _, g := range groups {
+ switch ds.kind {
+ case schema.KindMeasure:
+ case schema.KindStream:
+ default:
+ continue
+ }
+ ctx, cancel := context.WithTimeout(context.Background(),
5*time.Second)
+ shards, innerErr :=
ds.metadataRepo.ShardRegistry().ListShard(ctx, schema.ListOpt{Group:
g.Metadata.Name})
+ cancel()
+ if innerErr != nil {
+ return innerErr
+ }
+ for _, s := range shards {
+ ds.shardRepo.OnAddOrUpdate(schema.Metadata{
+ TypeMeta: schema.TypeMeta{
+ Kind: schema.KindShard,
+ Name: s.Metadata.Name,
+ Group: s.Metadata.Group,
+ },
+ Spec: s,
+ })
+ }
+
+ switch ds.kind {
+ case schema.KindMeasure:
+ ctx, cancel = context.WithTimeout(context.Background(),
5*time.Second)
+ mm, innerErr :=
ds.metadataRepo.MeasureRegistry().ListMeasure(ctx, schema.ListOpt{Group:
g.Metadata.Name})
+ cancel()
+ if innerErr != nil {
+ return innerErr
+ }
+ for _, m := range mm {
+ ds.entityRepo.OnAddOrUpdate(schema.Metadata{
+ TypeMeta: schema.TypeMeta{
+ Kind: schema.KindMeasure,
+ Name: m.Metadata.Name,
+ Group: m.Metadata.Group,
+ },
+ Spec: m,
+ })
+ }
+ case schema.KindStream:
+ ctx, cancel = context.WithTimeout(context.Background(),
5*time.Second)
+ ss, innerErr :=
ds.metadataRepo.StreamRegistry().ListStream(ctx, schema.ListOpt{Group:
g.Metadata.Name})
+ cancel()
+ if innerErr != nil {
+ return innerErr
+ }
+ for _, s := range ss {
+ ds.entityRepo.OnAddOrUpdate(schema.Metadata{
+ TypeMeta: schema.TypeMeta{
+ Kind: schema.KindStream,
+ Name: s.Metadata.Name,
+ Group: s.Metadata.Group,
+ },
+ Spec: s,
+ })
+ }
+ default:
+ return fmt.Errorf("unsupported kind: %d", ds.kind)
+ }
+ }
+ ds.metadataRepo.RegisterHandler(schema.KindShard, ds.shardRepo)
+ ds.metadataRepo.RegisterHandler(ds.kind, ds.entityRepo)
+ return nil
+}
+
func (ds *discoveryService) SetLogger(log *logger.Logger) {
ds.log = log
ds.shardRepo.log = log
@@ -75,38 +160,46 @@ type identity struct {
group string
}
+func (i identity) String() string {
+ return fmt.Sprintf("%s/%s", i.group, i.name)
+}
+
+var _ schema.EventHandler = (*shardRepo)(nil)
+
type shardRepo struct {
log *logger.Logger
shardEventsMap map[identity]uint32
sync.RWMutex
}
-func (s *shardRepo) Rev(message bus.Message) (resp bus.Message) {
- e, ok := message.Data().(*databasev1.ShardEvent)
- if !ok {
- s.log.Warn().Msg("invalid e data type")
+// OnAddOrUpdate implements schema.EventHandler.
+func (s *shardRepo) OnAddOrUpdate(schemaMetadata schema.Metadata) {
+ if schemaMetadata.Kind != schema.KindShard {
return
}
- s.setShardNum(e)
-
+ shard := schemaMetadata.Spec.(*databasev1.Shard)
+ idx := getID(shard.GetMetadata())
if le := s.log.Debug(); le.Enabled() {
- le.
- Str("action", databasev1.Action_name[int32(e.Action)]).
- Uint64("shardID", e.Shard.Id).
- Msg("received a shard e")
+ le.Stringer("id", idx).Uint32("total", shard.Total).Msg("shard
added or updated")
}
- return
+ s.RWMutex.Lock()
+ defer s.RWMutex.Unlock()
+ s.shardEventsMap[idx] = shard.Total
}
-func (s *shardRepo) setShardNum(eventVal *databasev1.ShardEvent) {
+// OnDelete implements schema.EventHandler.
+func (s *shardRepo) OnDelete(schemaMetadata schema.Metadata) {
+ if schemaMetadata.Kind != schema.KindShard {
+ return
+ }
+ shard := schemaMetadata.Spec.(*databasev1.Shard)
+ idx := getID(shard.GetMetadata())
+ if le := s.log.Debug(); le.Enabled() {
+ le.Stringer("id", idx).Msg("shard deleted")
+ }
s.RWMutex.Lock()
defer s.RWMutex.Unlock()
- idx := getID(eventVal.GetShard().GetMetadata())
- if eventVal.Action == databasev1.Action_ACTION_PUT {
- s.shardEventsMap[idx] = eventVal.Shard.Total
- } else if eventVal.Action == databasev1.Action_ACTION_DELETE {
- delete(s.shardEventsMap, idx)
- }
+ delete(s.shardEventsMap, idx)
}
func (s *shardRepo) shardNum(idx identity) (uint32, bool) {
@@ -126,49 +219,96 @@ func getID(metadata *commonv1.Metadata) identity {
}
}
+var _ schema.EventHandler = (*entityRepo)(nil)
+
type entityRepo struct {
log *logger.Logger
entitiesMap map[identity]partition.EntityLocator
sync.RWMutex
}
-func (s *entityRepo) Rev(message bus.Message) (resp bus.Message) {
- e, ok := message.Data().(*databasev1.EntityEvent)
- if !ok {
- s.log.Warn().Msg("invalid e data type")
+// OnAddOrUpdate implements schema.EventHandler.
+func (e *entityRepo) OnAddOrUpdate(schemaMetadata schema.Metadata) {
+ var el partition.EntityLocator
+ var id identity
+ switch schemaMetadata.Kind {
+ case schema.KindMeasure:
+ measure := schemaMetadata.Spec.(*databasev1.Measure)
+ el = partition.NewEntityLocator(measure.TagFamilies,
measure.Entity)
+ id = getID(measure.GetMetadata())
+ case schema.KindStream:
+ stream := schemaMetadata.Spec.(*databasev1.Stream)
+ el = partition.NewEntityLocator(stream.TagFamilies,
stream.Entity)
+ id = getID(stream.GetMetadata())
+ default:
return
}
- id := getID(e.GetSubject())
- if le := s.log.Debug(); le.Enabled() {
+ if le := e.log.Debug(); le.Enabled() {
+ var kind string
+ switch schemaMetadata.Kind {
+ case schema.KindMeasure:
+ kind = "measure"
+ case schema.KindStream:
+ kind = "stream"
+ default:
+ kind = "unknown"
+ }
le.
- Str("action", databasev1.Action_name[int32(e.Action)]).
- Interface("subject", id).
- Msg("received an entity event")
+ Str("action", "add_or_update").
+ Stringer("subject", id).
+ Str("kind", kind).
+ Msg("entity added or updated")
}
- s.RWMutex.Lock()
- defer s.RWMutex.Unlock()
- switch e.Action {
- case databasev1.Action_ACTION_PUT:
- en := make(partition.EntityLocator, 0,
len(e.GetEntityLocator()))
- for _, l := range e.GetEntityLocator() {
- en = append(en, partition.TagLocator{
- FamilyOffset: int(l.FamilyOffset),
- TagOffset: int(l.TagOffset),
- })
+ en := make(partition.EntityLocator, 0, len(el))
+ for _, l := range el {
+ en = append(en, partition.TagLocator{
+ FamilyOffset: l.FamilyOffset,
+ TagOffset: l.TagOffset,
+ })
+ }
+ e.RWMutex.Lock()
+ defer e.RWMutex.Unlock()
+ e.entitiesMap[id] = en
+}
+
+// OnDelete implements schema.EventHandler.
+func (e *entityRepo) OnDelete(schemaMetadata schema.Metadata) {
+ var id identity
+ switch schemaMetadata.Kind {
+ case schema.KindMeasure:
+ measure := schemaMetadata.Spec.(*databasev1.Measure)
+ id = getID(measure.GetMetadata())
+ case schema.KindStream:
+ stream := schemaMetadata.Spec.(*databasev1.Stream)
+ id = getID(stream.GetMetadata())
+ default:
+ return
+ }
+ if le := e.log.Debug(); le.Enabled() {
+ var kind string
+ switch schemaMetadata.Kind {
+ case schema.KindMeasure:
+ kind = "measure"
+ case schema.KindStream:
+ kind = "stream"
+ default:
+ kind = "unknown"
}
- s.entitiesMap[id] = en
- case databasev1.Action_ACTION_DELETE:
- delete(s.entitiesMap, id)
- case databasev1.Action_ACTION_UNSPECIFIED:
- s.log.Warn().RawJSON("event", logger.Proto(e)).Msg("ignored
unspecified event")
+ le.
+ Str("action", "delete").
+ Stringer("subject", id).
+ Str("kind", kind).
+ Msg("entity deleted")
}
- return
+ e.RWMutex.Lock()
+ defer e.RWMutex.Unlock()
+ delete(e.entitiesMap, id)
}
-func (s *entityRepo) getLocator(id identity) (partition.EntityLocator, bool) {
- s.RWMutex.RLock()
- defer s.RWMutex.RUnlock()
- el, ok := s.entitiesMap[id]
+func (e *entityRepo) getLocator(id identity) (partition.EntityLocator, bool) {
+ e.RWMutex.RLock()
+ defer e.RWMutex.RUnlock()
+ el, ok := e.entitiesMap[id]
if !ok {
return nil, false
}
diff --git a/banyand/liaison/grpc/registry_test.go
b/banyand/liaison/grpc/registry_test.go
index cc48ce86..4b12a1ac 100644
--- a/banyand/liaison/grpc/registry_test.go
+++ b/banyand/liaison/grpc/registry_test.go
@@ -31,7 +31,6 @@ import (
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
- "github.com/apache/skywalking-banyandb/banyand/discovery"
"github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/queue"
@@ -173,17 +172,14 @@ var _ = Describe("Registry", func() {
})
func setupForRegistry() func() {
- // Init `Discovery` module
- repo, err := discovery.NewServiceRepo(context.Background())
- Expect(err).NotTo(HaveOccurred())
// Init `Queue` module
- pipeline, err := queue.NewQueue(context.TODO(), repo)
+ pipeline, err := queue.NewQueue(context.TODO())
Expect(err).NotTo(HaveOccurred())
// Init `Metadata` module
metaSvc, err := metadata.NewService(context.TODO())
Expect(err).NotTo(HaveOccurred())
- tcp := grpc.NewServer(context.TODO(), pipeline, repo, metaSvc)
+ tcp := grpc.NewServer(context.TODO(), pipeline, metaSvc)
preloadStreamSvc := &preloadStreamService{metaSvc: metaSvc}
var flags []string
metaPath, metaDeferFunc, err := test.NewSpace()
@@ -194,7 +190,6 @@ func setupForRegistry() func() {
"--etcd-listen-peer-url="+listenPeerURL)
deferFunc := test.SetupModules(
flags,
- repo,
pipeline,
metaSvc,
preloadStreamSvc,
diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index 9ead16c3..2bb90664 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -34,16 +34,14 @@ import (
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"
- "github.com/apache/skywalking-banyandb/api/event"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
propertyv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
streamv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
- "github.com/apache/skywalking-banyandb/banyand/discovery"
"github.com/apache/skywalking-banyandb/banyand/metadata"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/banyand/queue"
- "github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
)
@@ -61,7 +59,6 @@ var (
type server struct {
pipeline queue.Queue
creds credentials.TransportCredentials
- repo discovery.ServiceRepo
*indexRuleRegistryServer
measureSVC *measureService
log *logger.Logger
@@ -85,16 +82,15 @@ type server struct {
}
// NewServer returns a new gRPC server.
-func NewServer(_ context.Context, pipeline queue.Queue, repo
discovery.ServiceRepo, schemaRegistry metadata.Repo) run.Unit {
+func NewServer(_ context.Context, pipeline queue.Queue, schemaRegistry
metadata.Repo) run.Unit {
streamSVC := &streamService{
- discoveryService: newDiscoveryService(pipeline),
+ discoveryService: newDiscoveryService(pipeline,
schema.KindStream, schemaRegistry),
}
measureSVC := &measureService{
- discoveryService: newDiscoveryService(pipeline),
+ discoveryService: newDiscoveryService(pipeline,
schema.KindMeasure, schemaRegistry),
}
s := &server{
pipeline: pipeline,
- repo: repo,
streamSVC: streamSVC,
measureSVC: measureSVC,
streamRegistryServer: &streamRegistryServer{
@@ -127,30 +123,13 @@ func (s *server) PreRun() error {
s.log = logger.GetLogger("liaison-grpc")
s.streamSVC.setLogger(s.log)
s.measureSVC.setLogger(s.log)
- components := []struct {
- discoverySVC *discoveryService
- shardEvent bus.Topic
- entityEvent bus.Topic
- }{
- {
- shardEvent: event.StreamTopicShardEvent,
- entityEvent: event.StreamTopicEntityEvent,
- discoverySVC: s.streamSVC.discoveryService,
- },
- {
- shardEvent: event.MeasureTopicShardEvent,
- entityEvent: event.MeasureTopicEntityEvent,
- discoverySVC: s.measureSVC.discoveryService,
- },
+ components := []*discoveryService{
+ s.streamSVC.discoveryService,
+ s.measureSVC.discoveryService,
}
for _, c := range components {
- c.discoverySVC.SetLogger(s.log)
- err := s.repo.Subscribe(c.shardEvent, c.discoverySVC.shardRepo)
- if err != nil {
- return err
- }
- err = s.repo.Subscribe(c.entityEvent, c.discoverySVC.entityRepo)
- if err != nil {
+ c.SetLogger(s.log)
+ if err := c.initialize(); err != nil {
return err
}
}
diff --git a/banyand/liaison/liaison.go b/banyand/liaison/liaison.go
index 539d2f45..a1b657cf 100644
--- a/banyand/liaison/liaison.go
+++ b/banyand/liaison/liaison.go
@@ -21,7 +21,6 @@ package liaison
import (
"context"
- "github.com/apache/skywalking-banyandb/banyand/discovery"
"github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/queue"
@@ -29,6 +28,6 @@ import (
)
// NewEndpoint return a new endpoint which is the entry point for the database
server.
-func NewEndpoint(ctx context.Context, pipeline queue.Queue, repo
discovery.ServiceRepo, schemaRegistry metadata.Repo) (run.Unit, error) {
- return grpc.NewServer(ctx, pipeline, repo, schemaRegistry), nil
+func NewEndpoint(ctx context.Context, pipeline queue.Queue, schemaRegistry
metadata.Repo) (run.Unit, error) {
+ return grpc.NewServer(ctx, pipeline, schemaRegistry), nil
}
diff --git a/banyand/measure/measure.go b/banyand/measure/measure.go
index a5ee5857..9fbfe653 100644
--- a/banyand/measure/measure.go
+++ b/banyand/measure/measure.go
@@ -31,7 +31,6 @@ import (
"github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/banyand/tsdb/index"
"github.com/apache/skywalking-banyandb/pkg/logger"
- "github.com/apache/skywalking-banyandb/pkg/partition"
"github.com/apache/skywalking-banyandb/pkg/query/logical"
resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -51,7 +50,6 @@ type measure struct {
processorManager *topNProcessorManager
name string
group string
- entityLocator partition.EntityLocator
indexRules []*databasev1.IndexRule
topNAggregations []*databasev1.TopNAggregation
maxObservedModRevision int64
@@ -98,10 +96,6 @@ func (s *measure) MaxObservedModRevision() int64 {
return s.maxObservedModRevision
}
-func (s *measure) EntityLocator() partition.EntityLocator {
- return s.entityLocator
-}
-
func (s *measure) Close() error {
if s.processorManager == nil {
return nil
@@ -111,7 +105,6 @@ func (s *measure) Close() error {
func (s *measure) parseSpec() (err error) {
s.name, s.group = s.schema.GetMetadata().GetName(),
s.schema.GetMetadata().GetGroup()
- s.entityLocator = partition.NewEntityLocator(s.schema.GetTagFamilies(),
s.schema.GetEntity())
s.maxObservedModRevision =
int64(math.Max(float64(resourceSchema.ParseMaxModRevision(s.indexRules)),
float64(resourceSchema.ParseMaxModRevision(s.topNAggregations))))
if s.schema.Interval != "" {
s.interval, err = timestamp.ParseDuration(s.schema.Interval)
diff --git a/banyand/measure/measure_suite_test.go
b/banyand/measure/measure_suite_test.go
index 704720d2..a1c829bb 100644
--- a/banyand/measure/measure_suite_test.go
+++ b/banyand/measure/measure_suite_test.go
@@ -25,9 +25,6 @@ import (
"github.com/onsi/gomega"
"go.uber.org/mock/gomock"
- "github.com/apache/skywalking-banyandb/api/event"
- databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
- "github.com/apache/skywalking-banyandb/banyand/discovery"
"github.com/apache/skywalking-banyandb/banyand/measure"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/queue"
@@ -64,26 +61,15 @@ func (p *preloadMeasureService) PreRun() error {
type services struct {
measure measure.Service
metadataService metadata.Service
- repo *discovery.MockServiceRepo
pipeline queue.Queue
}
func setUp() (*services, func()) {
ctrl := gomock.NewController(ginkgo.GinkgoT())
gomega.Expect(ctrl).ShouldNot(gomega.BeNil())
- // Init Discovery
- repo := discovery.NewMockServiceRepo(ctrl)
- repo.EXPECT().NodeID().AnyTimes()
- repo.EXPECT().Name().AnyTimes()
- stopCh := make(chan struct{})
- repo.EXPECT().Serve().Return(stopCh).Times(1)
- repo.EXPECT().GracefulStop().Do(func() { close(stopCh) }).Times(1)
- // Both PreRun and Serve phases send events
- repo.EXPECT().Publish(event.MeasureTopicEntityEvent,
test.NewEntityEventMatcher(databasev1.Action_ACTION_PUT)).AnyTimes()
- repo.EXPECT().Publish(event.MeasureTopicShardEvent,
test.NewShardEventMatcher(databasev1.Action_ACTION_PUT)).AnyTimes()
// Init Pipeline
- pipeline, err := queue.NewQueue(context.TODO(), repo)
+ pipeline, err := queue.NewQueue(context.TODO())
gomega.Expect(err).NotTo(gomega.HaveOccurred())
// Init Metadata Service
@@ -91,7 +77,7 @@ func setUp() (*services, func()) {
gomega.Expect(err).NotTo(gomega.HaveOccurred())
// Init Measure Service
- measureService, err := measure.NewService(context.TODO(),
metadataService, repo, pipeline)
+ measureService, err := measure.NewService(context.TODO(),
metadataService, pipeline)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
preloadMeasureSvc := &preloadMeasureService{metaSvc: metadataService}
var flags []string
@@ -106,7 +92,6 @@ func setUp() (*services, func()) {
flags = append(flags, "--etcd-listen-client-url="+listenClientURL,
"--etcd-listen-peer-url="+listenPeerURL)
moduleDeferFunc := test.SetupModules(
flags,
- repo,
pipeline,
metadataService,
preloadMeasureSvc,
@@ -115,7 +100,6 @@ func setUp() (*services, func()) {
return &services{
measure: measureService,
metadataService: metadataService,
- repo: repo,
pipeline: pipeline,
}, func() {
moduleDeferFunc()
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index 2a577b26..a86bfcc4 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -28,10 +28,8 @@ import (
"google.golang.org/protobuf/testing/protocmp"
"github.com/apache/skywalking-banyandb/api/common"
- "github.com/apache/skywalking-banyandb/api/event"
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
- "github.com/apache/skywalking-banyandb/banyand/discovery"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/banyand/queue"
@@ -49,7 +47,7 @@ type schemaRepo struct {
metadata metadata.Repo
}
-func newSchemaRepo(path string, metadata metadata.Repo, repo
discovery.ServiceRepo,
+func newSchemaRepo(path string, metadata metadata.Repo,
dbOpts tsdb.DatabaseOpts, l *logger.Logger, pipeline queue.Queue,
encoderBufferSize, bufferSize int64,
) schemaRepo {
return schemaRepo{
@@ -57,11 +55,8 @@ func newSchemaRepo(path string, metadata metadata.Repo, repo
discovery.ServiceRe
metadata: metadata,
Repository: resourceSchema.NewRepository(
metadata,
- repo,
l,
newSupplier(path, metadata, dbOpts, l, pipeline,
encoderBufferSize, bufferSize),
- event.MeasureTopicShardEvent,
- event.MeasureTopicEntityEvent,
),
}
}
diff --git a/banyand/measure/metadata_test.go b/banyand/measure/metadata_test.go
index fdbada2f..9504c1f9 100644
--- a/banyand/measure/metadata_test.go
+++ b/banyand/measure/metadata_test.go
@@ -24,11 +24,9 @@ import (
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gleak"
- "github.com/apache/skywalking-banyandb/api/event"
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/banyand/measure"
- "github.com/apache/skywalking-banyandb/pkg/test"
"github.com/apache/skywalking-banyandb/pkg/test/flags"
)
@@ -54,7 +52,6 @@ var _ = Describe("Metadata", func() {
}).WithTimeout(flags.EventuallyTimeout).Should(BeTrue())
})
It("should close the group", func() {
-
svcs.repo.EXPECT().Publish(event.MeasureTopicShardEvent,
test.NewShardEventMatcher(databasev1.Action_ACTION_DELETE)).Times(2)
deleted, err :=
svcs.metadataService.GroupRegistry().DeleteGroup(context.TODO(), "sw_metric")
Expect(err).ShouldNot(HaveOccurred())
Expect(deleted).Should(BeTrue())
@@ -65,8 +62,6 @@ var _ = Describe("Metadata", func() {
})
It("should add shards", func() {
-
svcs.repo.EXPECT().Publish(event.MeasureTopicShardEvent,
test.NewShardEventMatcher(databasev1.Action_ACTION_DELETE)).AnyTimes()
-
svcs.repo.EXPECT().Publish(event.MeasureTopicShardEvent,
test.NewShardEventMatcher(databasev1.Action_ACTION_PUT)).AnyTimes()
groupSchema, err :=
svcs.metadataService.GroupRegistry().GetGroup(context.TODO(), "sw_metric")
Expect(err).ShouldNot(HaveOccurred())
Expect(groupSchema).ShouldNot(BeNil())
@@ -95,7 +90,6 @@ var _ = Describe("Metadata", func() {
}).WithTimeout(flags.EventuallyTimeout).Should(BeTrue())
})
It("should close the measure", func() {
-
svcs.repo.EXPECT().Publish(event.MeasureTopicEntityEvent,
test.NewEntityEventMatcher(databasev1.Action_ACTION_DELETE)).Times(1)
deleted, err :=
svcs.metadataService.MeasureRegistry().DeleteMeasure(context.TODO(),
&commonv1.Metadata{
Name: "service_cpm_minute",
Group: "sw_metric",
@@ -126,7 +120,6 @@ var _ = Describe("Metadata", func() {
})
It("should update a new measure", func() {
-
svcs.repo.EXPECT().Publish(event.MeasureTopicEntityEvent,
test.NewEntityEventMatcher(databasev1.Action_ACTION_PUT)).AnyTimes()
// Remove the first tag from the entity
measureSchema.Entity.TagNames =
measureSchema.Entity.TagNames[1:]
entitySize := len(measureSchema.Entity.TagNames)
diff --git a/banyand/measure/service.go b/banyand/measure/service.go
index 04c1b9ce..6e46f841 100644
--- a/banyand/measure/service.go
+++ b/banyand/measure/service.go
@@ -28,7 +28,6 @@ import (
"github.com/apache/skywalking-banyandb/api/data"
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
- "github.com/apache/skywalking-banyandb/banyand/discovery"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/banyand/observability"
@@ -61,9 +60,7 @@ type service struct {
writeListener bus.MessageListener
metadata metadata.Repo
pipeline queue.Queue
- repo discovery.ServiceRepo
l *logger.Logger
- stopCh chan struct{}
root string
dbOpts tsdb.DatabaseOpts
BlockEncoderBufferSize run.Bytes
@@ -116,7 +113,7 @@ func (s *service) PreRun() error {
}
path := path.Join(s.root, s.Name())
observability.UpdatePath(path)
- s.schemaRepo = newSchemaRepo(path, s.metadata, s.repo, s.dbOpts,
+ s.schemaRepo = newSchemaRepo(path, s.metadata, s.dbOpts,
s.l, s.pipeline, int64(s.BlockEncoderBufferSize),
int64(s.BlockBufferSize))
for _, g := range groups {
if g.Catalog != commonv1.Catalog_CATALOG_MEASURE {
@@ -144,6 +141,11 @@ func (s *service) PreRun() error {
}
}
}
+ // run a serial watcher
+ go s.schemaRepo.Watcher()
+ s.metadata.
+
RegisterHandler(schema.KindGroup|schema.KindMeasure|schema.KindIndexRuleBinding|schema.KindIndexRule|schema.KindTopNAggregation,
+ &s.schemaRepo)
s.writeListener = setUpWriteCallback(s.l, &s.schemaRepo)
err = s.pipeline.Subscribe(data.TopicMeasureWrite, s.writeListener)
@@ -177,31 +179,18 @@ func (s *service) sanityCheck(group resourceSchema.Group,
measureSchema *databas
}
func (s *service) Serve() run.StopNotify {
- _ = s.schemaRepo.NotifyAll()
- // run a serial watcher
- go s.schemaRepo.Watcher()
-
- s.metadata.MeasureRegistry().
-
RegisterHandler(schema.KindGroup|schema.KindMeasure|schema.KindIndexRuleBinding|schema.KindIndexRule|schema.KindTopNAggregation,
- &s.schemaRepo)
-
- return s.stopCh
+ return s.schemaRepo.StopCh()
}
func (s *service) GracefulStop() {
s.schemaRepo.Close()
- if s.stopCh != nil {
- close(s.stopCh)
- }
}
// NewService returns a new service.
-func NewService(_ context.Context, metadata metadata.Repo, repo
discovery.ServiceRepo, pipeline queue.Queue) (Service, error) {
+func NewService(_ context.Context, metadata metadata.Repo, pipeline
queue.Queue) (Service, error) {
return &service{
metadata: metadata,
- repo: repo,
pipeline: pipeline,
- stopCh: make(chan struct{}),
dbOpts: tsdb.DatabaseOpts{
IndexGranularity: tsdb.IndexGranularitySeries,
},
diff --git a/banyand/metadata/allocator.go b/banyand/metadata/allocator.go
new file mode 100644
index 00000000..acf52dbf
--- /dev/null
+++ b/banyand/metadata/allocator.go
@@ -0,0 +1,90 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package metadata
+
+import (
+ "context"
+ "time"
+
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+var _ schema.EventHandler = (*allocator)(nil)
+
+type allocator struct {
+ schemaRegistry schema.Registry
+ l *logger.Logger
+}
+
+func newAllocator(schemaRegistry schema.Registry, logger *logger.Logger)
*allocator {
+ return &allocator{
+ schemaRegistry: schemaRegistry,
+ l: logger,
+ }
+}
+
+// OnAddOrUpdate implements EventHandler.
+func (a *allocator) OnAddOrUpdate(metadata schema.Metadata) {
+ switch metadata.Kind {
+ case schema.KindGroup:
+ groupSchema := metadata.Spec.(*commonv1.Group)
+ if groupSchema.Catalog == commonv1.Catalog_CATALOG_UNSPECIFIED {
+ return
+ }
+ now := time.Now()
+ nowPb := timestamppb.New(now)
+ shardNum := groupSchema.GetResourceOpts().GetShardNum()
+ syncShard := func(id uint64) error {
+ ctx, cancel :=
context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ return a.schemaRegistry.CreateOrUpdateShard(ctx,
&databasev1.Shard{
+ Id: id,
+ Total: shardNum,
+ Metadata: &commonv1.Metadata{
+ Name:
groupSchema.GetMetadata().GetName(),
+ },
+ Node: &databasev1.Node{
+ Id: "TODO",
+ CreatedAt: nowPb,
+ UpdatedAt: nowPb,
+ Addr: "TODO",
+ },
+ })
+ }
+ for i := 0; i < int(shardNum); i++ {
+ if err := syncShard(uint64(i)); err != nil {
+ // TODO: handle error. retry? or do a full sync?
+ a.l.Error().Err(err).Msg("failed to sync shard")
+ }
+ }
+ case schema.KindNode:
+ // TODO: handle node
+ default:
+ return
+ }
+}
+
+// OnDelete implements EventHandler.
+func (*allocator) OnDelete(schema.Metadata) {
+ // TODO: handle delete
+}
diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go
index cfcfec84..0fc967c7 100644
--- a/banyand/metadata/client.go
+++ b/banyand/metadata/client.go
@@ -27,6 +27,7 @@ import (
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
)
@@ -39,10 +40,15 @@ func NewClient(_ context.Context) (Service, error) {
type clientService struct {
schemaRegistry schema.Registry
+ alc *allocator
closer *run.Closer
endpoints []string
}
+func (s *clientService) SchemaRegistry() schema.Registry {
+ return s.schemaRegistry
+}
+
func (s *clientService) FlagSet() *run.FlagSet {
fs := run.NewFlagSet("metadata")
fs.StringArrayVar(&s.endpoints, flagEtcdEndpointsName,
[]string{"http://localhost:2379"}, "A comma-delimited list of etcd endpoints")
@@ -62,6 +68,9 @@ func (s *clientService) PreRun() error {
if err != nil {
return err
}
+
+ s.alc = newAllocator(s.schemaRegistry,
logger.GetLogger(s.Name()).Named("allocator"))
+ s.schemaRegistry.RegisterHandler(schema.KindGroup|schema.KindNode,
s.alc)
return nil
}
@@ -75,8 +84,8 @@ func (s *clientService) GracefulStop() {
_ = s.schemaRegistry.Close()
}
-func (s *clientService) SchemaRegistry() schema.Registry {
- return s.schemaRegistry
+func (s *clientService) RegisterHandler(kind schema.Kind, handler
schema.EventHandler) {
+ s.schemaRegistry.RegisterHandler(kind, handler)
}
func (s *clientService) StreamRegistry() schema.Stream {
@@ -107,6 +116,10 @@ func (s *clientService) PropertyRegistry() schema.Property
{
return s.schemaRegistry
}
+func (s *clientService) ShardRegistry() schema.Shard {
+ return s.schemaRegistry
+}
+
func (s *clientService) Name() string {
return "metadata"
}
diff --git a/banyand/metadata/metadata.go b/banyand/metadata/metadata.go
index f9ab33d7..9bad1e72 100644
--- a/banyand/metadata/metadata.go
+++ b/banyand/metadata/metadata.go
@@ -46,6 +46,8 @@ type Repo interface {
GroupRegistry() schema.Group
TopNAggregationRegistry() schema.TopNAggregation
PropertyRegistry() schema.Property
+ ShardRegistry() schema.Shard
+ RegisterHandler(schema.Kind, schema.EventHandler)
}
// Service is the metadata repository.
diff --git a/banyand/metadata/schema/checker.go
b/banyand/metadata/schema/checker.go
index 4a1a8751..3d669cbc 100644
--- a/banyand/metadata/schema/checker.go
+++ b/banyand/metadata/schema/checker.go
@@ -82,6 +82,20 @@ var checkerMap = map[Kind]equalityChecker{
protocmp.IgnoreFields(&commonv1.Metadata{}, "id",
"create_revision", "mod_revision"),
protocmp.Transform())
},
+ KindNode: func(a, b proto.Message) bool {
+ return cmp.Equal(a, b,
+ protocmp.IgnoreUnknown(),
+ protocmp.IgnoreFields(&databasev1.Node{}, "updated_at"),
+ protocmp.IgnoreFields(&commonv1.Metadata{}, "id",
"create_revision", "mod_revision"),
+ protocmp.Transform())
+ },
+ KindShard: func(a, b proto.Message) bool {
+ return cmp.Equal(a, b,
+ protocmp.IgnoreUnknown(),
+ protocmp.IgnoreFields(&databasev1.Shard{},
"updated_at"),
+ protocmp.IgnoreFields(&commonv1.Metadata{}, "id",
"create_revision", "mod_revision"),
+ protocmp.Transform())
+ },
KindMask: func(a, b proto.Message) bool {
return false
},
diff --git a/banyand/metadata/schema/node.go b/banyand/metadata/schema/node.go
new file mode 100644
index 00000000..0c6dcdb1
--- /dev/null
+++ b/banyand/metadata/schema/node.go
@@ -0,0 +1,53 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schema
+
+import (
+ "context"
+
+ "google.golang.org/protobuf/proto"
+
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+)
+
+var nodeKeyPrefix = "/nodes/"
+
+func (e *etcdSchemaRegistry) ListNode(ctx context.Context, role Role)
([]*databasev1.Node, error) {
+ if role == "" {
+ return nil, BadRequest("group", "group should not be empty")
+ }
+ messages, err := e.listWithPrefix(ctx,
listPrefixesForEntity(string(role), nodeKeyPrefix), func() proto.Message {
+ return &databasev1.Node{}
+ })
+ if err != nil {
+ return nil, err
+ }
+ entities := make([]*databasev1.Node, 0, len(messages))
+ for _, message := range messages {
+ entities = append(entities, message.(*databasev1.Node))
+ }
+ return entities, nil
+}
+
+func formatNodePrefix(role Role) string {
+ return nodeKeyPrefix + string(role)
+}
+
+func formatNodeKey(role Role, id string) string {
+ return formatNodePrefix(role) + "/" + id
+}
diff --git a/banyand/metadata/schema/schema.go
b/banyand/metadata/schema/schema.go
index f9614938..fd97a1d6 100644
--- a/banyand/metadata/schema/schema.go
+++ b/banyand/metadata/schema/schema.go
@@ -50,7 +50,11 @@ const (
KindIndexRule
KindTopNAggregation
KindProperty
- KindMask = KindGroup | KindStream | KindMeasure | KindIndexRuleBinding
| KindIndexRule | KindTopNAggregation
+ KindNode
+ KindShard
+ KindMask = KindGroup | KindStream | KindMeasure |
+ KindIndexRuleBinding | KindIndexRule |
+ KindTopNAggregation | KindProperty | KindNode | KindShard
)
// ListOpt contains options to list resources.
@@ -68,6 +72,9 @@ type Registry interface {
Group
TopNAggregation
Property
+ Node
+ Shard
+ RegisterHandler(Kind, EventHandler)
}
// TypeMeta defines the identity and type of an Event.
@@ -103,6 +110,10 @@ func (tm TypeMeta) Unmarshal(data []byte) (m
proto.Message, err error) {
m = &propertyv1.Property{}
case KindTopNAggregation:
m = &databasev1.TopNAggregation{}
+ case KindNode:
+ m = &databasev1.Node{}
+ case KindShard:
+ m = &databasev1.Shard{}
default:
return nil, errUnsupportedEntityType
}
@@ -144,6 +155,17 @@ func (m Metadata) key() (string, error) {
Group: m.Group,
Name: m.Name,
}), nil
+ case KindNode:
+ r, err := strToRole(m.Group)
+ if err != nil {
+ return "", err
+ }
+ return formatNodeKey(r, m.Name), nil
+ case KindShard:
+ return formatShardKey(&commonv1.Metadata{
+ Group: m.Group,
+ Name: m.Name,
+ }), nil
default:
return "", errUnsupportedEntityType
}
@@ -168,7 +190,6 @@ type Stream interface {
CreateStream(ctx context.Context, stream *databasev1.Stream) error
UpdateStream(ctx context.Context, stream *databasev1.Stream) error
DeleteStream(ctx context.Context, metadata *commonv1.Metadata) (bool,
error)
- RegisterHandler(Kind, EventHandler)
}
// IndexRule allows CRUD index rule schemas in a group.
@@ -196,7 +217,6 @@ type Measure interface {
CreateMeasure(ctx context.Context, measure *databasev1.Measure) error
UpdateMeasure(ctx context.Context, measure *databasev1.Measure) error
DeleteMeasure(ctx context.Context, metadata *commonv1.Metadata) (bool,
error)
- RegisterHandler(Kind, EventHandler)
TopNAggregations(ctx context.Context, metadata *commonv1.Metadata)
([]*databasev1.TopNAggregation, error)
}
@@ -226,3 +246,37 @@ type Property interface {
ApplyProperty(ctx context.Context, property *propertyv1.Property,
strategy propertyv1.ApplyRequest_Strategy) (bool, uint32, error)
DeleteProperty(ctx context.Context, metadata *propertyv1.Metadata, tags
[]string) (bool, uint32, error)
}
+
+// Role is the role of node.
+type Role string
+
+const (
+ // RoleMeta is the role of meta node.
+ RoleMeta = "meta"
+ // RoleData is the role of data node.
+ RoleData = "data"
+ // RoleQuery is the role of query node.
+ RoleQuery = "query"
+ // RoleLiaison is the role of liaison node.
+ RoleLiaison = "liaison"
+)
+
+func strToRole(role string) (Role, error) {
+ switch role {
+ case RoleMeta, RoleData, RoleQuery, RoleLiaison:
+ return Role(role), nil
+ default:
+ return "", errors.New("invalid role")
+ }
+}
+
+// Node allows CRUD node schemas in a group.
+type Node interface {
+ ListNode(ctx context.Context, role Role) ([]*databasev1.Node, error)
+}
+
+// Shard allows CRUD shard schemas in a group.
+type Shard interface {
+ CreateOrUpdateShard(ctx context.Context, shard *databasev1.Shard) error
+ ListShard(ctx context.Context, opt ListOpt) ([]*databasev1.Shard, error)
+}
diff --git a/banyand/metadata/schema/shard.go b/banyand/metadata/schema/shard.go
new file mode 100644
index 00000000..caf8b9de
--- /dev/null
+++ b/banyand/metadata/schema/shard.go
@@ -0,0 +1,76 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schema
+
+import (
+ "context"
+
+ "github.com/pkg/errors"
+ "google.golang.org/protobuf/proto"
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+)
+
+var shardKeyPrefix = "/shards/"
+
+func (e *etcdSchemaRegistry) CreateOrUpdateShard(ctx context.Context, shard
*databasev1.Shard) error {
+ if shard.UpdatedAt != nil {
+ shard.UpdatedAt = timestamppb.Now()
+ }
+ md := Metadata{
+ TypeMeta: TypeMeta{
+ Kind: KindShard,
+ Group: shard.GetMetadata().GetGroup(),
+ Name: shard.GetMetadata().GetName(),
+ },
+ Spec: shard,
+ }
+ err := e.update(ctx, md)
+ if err == nil {
+ return nil
+ }
+ if errors.Is(err, ErrGRPCResourceNotFound) {
+ shard.CreatedAt = shard.UpdatedAt
+ md.Spec = shard
+ return e.create(ctx, md)
+ }
+ return err
+}
+
+func (e *etcdSchemaRegistry) ListShard(ctx context.Context, opt ListOpt)
([]*databasev1.Shard, error) {
+ if opt.Group == "" {
+ return nil, BadRequest("group", "group should not be empty")
+ }
+ messages, err := e.listWithPrefix(ctx, listPrefixesForEntity(opt.Group,
shardKeyPrefix), func() proto.Message {
+ return &databasev1.Shard{}
+ })
+ if err != nil {
+ return nil, err
+ }
+ entities := make([]*databasev1.Shard, 0, len(messages))
+ for _, message := range messages {
+ entities = append(entities, message.(*databasev1.Shard))
+ }
+ return entities, nil
+}
+
+func formatShardKey(metadata *commonv1.Metadata) string {
+ return formatKey(shardKeyPrefix, metadata)
+}
diff --git a/banyand/queue/local.go b/banyand/queue/local.go
index ec4a95e1..14a8632b 100644
--- a/banyand/queue/local.go
+++ b/banyand/queue/local.go
@@ -19,7 +19,6 @@
package queue
import (
- "github.com/apache/skywalking-banyandb/banyand/discovery"
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/run"
)
@@ -31,7 +30,6 @@ var (
type local struct {
local *bus.Bus
- repo discovery.ServiceRepo
stopCh chan struct{}
}
diff --git a/banyand/queue/queue.go b/banyand/queue/queue.go
index 0946ecb1..e4bc0b33 100644
--- a/banyand/queue/queue.go
+++ b/banyand/queue/queue.go
@@ -20,7 +20,6 @@ package queue
import (
"context"
- "github.com/apache/skywalking-banyandb/banyand/discovery"
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/run"
)
@@ -35,10 +34,9 @@ type Queue interface {
run.Service
}
-// NewQueue return a new Queue which relies on the discovery service.
-func NewQueue(_ context.Context, repo discovery.ServiceRepo) (Queue, error) {
+// NewQueue return a new Queue.
+func NewQueue(_ context.Context) (Queue, error) {
return &local{
- repo: repo,
local: bus.NewBus(),
stopCh: make(chan struct{}),
}, nil
diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go
index 28e78da8..d69710b1 100644
--- a/banyand/stream/metadata.go
+++ b/banyand/stream/metadata.go
@@ -23,10 +23,8 @@ import (
"time"
"github.com/apache/skywalking-banyandb/api/common"
- "github.com/apache/skywalking-banyandb/api/event"
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
- "github.com/apache/skywalking-banyandb/banyand/discovery"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
@@ -41,7 +39,7 @@ type schemaRepo struct {
metadata metadata.Repo
}
-func newSchemaRepo(path string, metadata metadata.Repo, repo
discovery.ServiceRepo,
+func newSchemaRepo(path string, metadata metadata.Repo,
bufferSize int64, dbOpts tsdb.DatabaseOpts, l *logger.Logger,
) schemaRepo {
return schemaRepo{
@@ -49,11 +47,8 @@ func newSchemaRepo(path string, metadata metadata.Repo, repo
discovery.ServiceRe
metadata: metadata,
Repository: resourceSchema.NewRepository(
metadata,
- repo,
l,
newSupplier(path, metadata, bufferSize, dbOpts, l),
- event.StreamTopicShardEvent,
- event.StreamTopicEntityEvent,
),
}
}
diff --git a/banyand/stream/metadata_test.go b/banyand/stream/metadata_test.go
index 3377d556..dc184d40 100644
--- a/banyand/stream/metadata_test.go
+++ b/banyand/stream/metadata_test.go
@@ -24,10 +24,8 @@ import (
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gleak"
- "github.com/apache/skywalking-banyandb/api/event"
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
- "github.com/apache/skywalking-banyandb/pkg/test"
"github.com/apache/skywalking-banyandb/pkg/test/flags"
)
@@ -54,7 +52,6 @@ var _ = Describe("Metadata", func() {
}).WithTimeout(flags.EventuallyTimeout).Should(BeTrue())
})
It("should close the group", func() {
- svcs.repo.EXPECT().Publish(event.StreamTopicShardEvent,
test.NewShardEventMatcher(databasev1.Action_ACTION_DELETE)).Times(2)
deleted, err :=
svcs.metadataService.GroupRegistry().DeleteGroup(context.TODO(), "default")
Expect(err).ShouldNot(HaveOccurred())
Expect(deleted).Should(BeTrue())
@@ -65,8 +62,6 @@ var _ = Describe("Metadata", func() {
})
It("should add shards", func() {
- svcs.repo.EXPECT().Publish(event.StreamTopicShardEvent,
test.NewShardEventMatcher(databasev1.Action_ACTION_DELETE)).Times(2)
- svcs.repo.EXPECT().Publish(event.StreamTopicShardEvent,
test.NewShardEventMatcher(databasev1.Action_ACTION_PUT)).Times(4)
groupSchema, err :=
svcs.metadataService.GroupRegistry().GetGroup(context.TODO(), "default")
Expect(err).ShouldNot(HaveOccurred())
Expect(groupSchema).ShouldNot(BeNil())
@@ -95,7 +90,6 @@ var _ = Describe("Metadata", func() {
}).WithTimeout(flags.EventuallyTimeout).Should(BeTrue())
})
It("should close the stream", func() {
-
svcs.repo.EXPECT().Publish(event.StreamTopicEntityEvent,
test.NewEntityEventMatcher(databasev1.Action_ACTION_DELETE)).Times(1)
deleted, err :=
svcs.metadataService.StreamRegistry().DeleteStream(context.TODO(),
&commonv1.Metadata{
Name: "sw",
Group: "default",
@@ -126,7 +120,6 @@ var _ = Describe("Metadata", func() {
})
It("should update a new stream", func() {
-
svcs.repo.EXPECT().Publish(event.StreamTopicEntityEvent,
test.NewEntityEventMatcher(databasev1.Action_ACTION_PUT)).Times(1)
// Remove the first tag from the entity
streamSchema.Entity.TagNames =
streamSchema.Entity.TagNames[1:]
entitySize := len(streamSchema.Entity.TagNames)
diff --git a/banyand/stream/service.go b/banyand/stream/service.go
index d395e9ca..07749f5b 100644
--- a/banyand/stream/service.go
+++ b/banyand/stream/service.go
@@ -26,7 +26,6 @@ import (
"github.com/apache/skywalking-banyandb/api/data"
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
- "github.com/apache/skywalking-banyandb/banyand/discovery"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/banyand/observability"
@@ -55,10 +54,8 @@ type service struct {
schemaRepo schemaRepo
metadata metadata.Repo
pipeline queue.Queue
- repo discovery.ServiceRepo
writeListener *writeCallback
l *logger.Logger
- stopCh chan struct{}
root string
dbOpts tsdb.DatabaseOpts
blockBufferSize run.Bytes
@@ -106,7 +103,7 @@ func (s *service) PreRun() error {
}
path := path.Join(s.root, s.Name())
observability.UpdatePath(path)
- s.schemaRepo = newSchemaRepo(path, s.metadata, s.repo,
int64(s.blockBufferSize), s.dbOpts, s.l)
+ s.schemaRepo = newSchemaRepo(path, s.metadata,
int64(s.blockBufferSize), s.dbOpts, s.l)
for _, g := range groups {
if g.Catalog != commonv1.Catalog_CATALOG_STREAM {
continue
@@ -127,6 +124,10 @@ func (s *service) PreRun() error {
}
}
}
+ // run a serial watcher
+ s.schemaRepo.Watcher()
+
s.metadata.RegisterHandler(schema.KindGroup|schema.KindStream|schema.KindIndexRuleBinding|schema.KindIndexRule,
+ &s.schemaRepo)
s.writeListener = setUpWriteCallback(s.l, &s.schemaRepo)
@@ -138,32 +139,21 @@ func (s *service) PreRun() error {
}
func (s *service) Serve() run.StopNotify {
- _ = s.schemaRepo.NotifyAll()
- // run a serial watcher
- s.schemaRepo.Watcher()
-
-
s.metadata.StreamRegistry().RegisterHandler(schema.KindGroup|schema.KindStream|schema.KindIndexRuleBinding|schema.KindIndexRule,
- &s.schemaRepo)
- return s.stopCh
+ return s.schemaRepo.StopCh()
}
func (s *service) GracefulStop() {
s.schemaRepo.Close()
- if s.stopCh != nil {
- close(s.stopCh)
- }
}
// NewService returns a new service.
-func NewService(_ context.Context, metadata metadata.Repo, repo
discovery.ServiceRepo, pipeline queue.Queue) (Service, error) {
+func NewService(_ context.Context, metadata metadata.Repo, pipeline
queue.Queue) (Service, error) {
return &service{
metadata: metadata,
- repo: repo,
pipeline: pipeline,
dbOpts: tsdb.DatabaseOpts{
EnableGlobalIndex: true,
IndexGranularity: tsdb.IndexGranularityBlock,
},
- stopCh: make(chan struct{}),
}, nil
}
diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go
index ded686fe..fb60bd64 100644
--- a/banyand/stream/stream.go
+++ b/banyand/stream/stream.go
@@ -27,7 +27,6 @@ import (
"github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/banyand/tsdb/index"
"github.com/apache/skywalking-banyandb/pkg/logger"
- "github.com/apache/skywalking-banyandb/pkg/partition"
"github.com/apache/skywalking-banyandb/pkg/schema"
)
@@ -43,7 +42,6 @@ type stream struct {
indexWriter *index.Writer
name string
group string
- entityLocator partition.EntityLocator
indexRules []*databasev1.IndexRule
maxObservedModRevision int64
shardNum uint32
@@ -69,17 +67,12 @@ func (s *stream) GetTopN() []*databasev1.TopNAggregation {
return nil
}
-func (s *stream) EntityLocator() partition.EntityLocator {
- return s.entityLocator
-}
-
func (s *stream) Close() error {
return nil
}
func (s *stream) parseSpec() {
s.name, s.group = s.schema.GetMetadata().GetName(),
s.schema.GetMetadata().GetGroup()
- s.entityLocator = partition.NewEntityLocator(s.schema.GetTagFamilies(),
s.schema.GetEntity())
s.maxObservedModRevision = schema.ParseMaxModRevision(s.indexRules)
}
diff --git a/banyand/stream/stream_suite_test.go
b/banyand/stream/stream_suite_test.go
index e89987f2..fa5fa2c3 100644
--- a/banyand/stream/stream_suite_test.go
+++ b/banyand/stream/stream_suite_test.go
@@ -25,9 +25,6 @@ import (
. "github.com/onsi/gomega"
"go.uber.org/mock/gomock"
- "github.com/apache/skywalking-banyandb/api/event"
- databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
- "github.com/apache/skywalking-banyandb/banyand/discovery"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/logger"
@@ -63,25 +60,13 @@ func (p *preloadStreamService) PreRun() error {
type services struct {
stream *service
metadataService metadata.Service
- repo *discovery.MockServiceRepo
}
func setUp() (*services, func()) {
ctrl := gomock.NewController(GinkgoT())
Expect(ctrl).ShouldNot(BeNil())
- // Init Discovery
- repo := discovery.NewMockServiceRepo(ctrl)
- repo.EXPECT().NodeID().AnyTimes()
- repo.EXPECT().Name().AnyTimes()
- stopCh := make(chan struct{})
- repo.EXPECT().Serve().Return(stopCh).Times(1)
- repo.EXPECT().GracefulStop().Do(func() { close(stopCh) }).Times(1)
- // Both PreRun and Serve phases send events
- repo.EXPECT().Publish(event.StreamTopicEntityEvent,
test.NewEntityEventMatcher(databasev1.Action_ACTION_PUT)).Times(2 * 1)
- repo.EXPECT().Publish(event.StreamTopicShardEvent,
test.NewShardEventMatcher(databasev1.Action_ACTION_PUT)).Times(2 * 2)
-
// Init Pipeline
- pipeline, err := queue.NewQueue(context.TODO(), repo)
+ pipeline, err := queue.NewQueue(context.TODO())
Expect(err).NotTo(HaveOccurred())
// Init Metadata Service
@@ -90,7 +75,7 @@ func setUp() (*services, func()) {
Expect(err).NotTo(HaveOccurred())
// Init Stream Service
- streamService, err := NewService(context.TODO(), metadataService, repo,
pipeline)
+ streamService, err := NewService(context.TODO(), metadataService,
pipeline)
Expect(err).NotTo(HaveOccurred())
preloadStreamSvc := &preloadStreamService{metaSvc: metadataService}
var flags []string
@@ -105,7 +90,6 @@ func setUp() (*services, func()) {
flags = append(flags, "--etcd-listen-client-url="+listenClientURL,
"--etcd-listen-peer-url="+listenPeerURL)
moduleDeferFunc := test.SetupModules(
flags,
- repo,
pipeline,
metadataService,
preloadStreamSvc,
@@ -114,7 +98,6 @@ func setUp() (*services, func()) {
return &services{
stream: streamService.(*service),
metadataService: metadataService,
- repo: repo,
}, func() {
moduleDeferFunc()
metaDeferFunc()
diff --git a/dist/LICENSE b/dist/LICENSE
index 55639375..87b47d39 100644
--- a/dist/LICENSE
+++ b/dist/LICENSE
@@ -191,7 +191,7 @@ Apache-2.0 licenses
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da Apache-2.0
github.com/google/btree v1.1.2 Apache-2.0
github.com/google/flatbuffers v1.12.1 Apache-2.0
- github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8 Apache-2.0
+ github.com/google/pprof v0.0.0-20230728192033-2ba5b33183c6 Apache-2.0
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 Apache-2.0
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus
v1.0.0-rc.0 Apache-2.0
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0-rc.5 Apache-2.0
@@ -287,7 +287,7 @@ BSD-3-Clause licenses
golang.org/x/sys v0.10.0 BSD-3-Clause
golang.org/x/text v0.11.0 BSD-3-Clause
golang.org/x/time v0.3.0 BSD-3-Clause
- golang.org/x/tools v0.11.0 BSD-3-Clause
+ golang.org/x/tools v0.11.1 BSD-3-Clause
google.golang.org/protobuf v1.31.0 BSD-3-Clause
========================================================================
@@ -326,7 +326,7 @@ MIT licenses
github.com/mattn/go-isatty v0.0.19 MIT
github.com/mitchellh/mapstructure v1.5.0 MIT
github.com/onsi/ginkgo/v2 v2.11.0 MIT
- github.com/onsi/gomega v1.27.8 MIT
+ github.com/onsi/gomega v1.27.10 MIT
github.com/pelletier/go-toml/v2 v2.0.8 MIT
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c MIT
github.com/robfig/cron/v3 v3.0.1 MIT
diff --git a/docs/api-reference.md b/docs/api-reference.md
index e587be94..96931a18 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -341,6 +341,7 @@ Metadata is for multi-tenant, multi-model use
| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| id | [string](#string) | | |
+| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) | |
|
| addr | [string](#string) | | |
| updated_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | |
| created_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | |
diff --git a/go.mod b/go.mod
index 5eea559e..7cf87065 100644
--- a/go.mod
+++ b/go.mod
@@ -21,7 +21,7 @@ require (
github.com/hashicorp/golang-lru/v2 v2.0.4
github.com/oklog/run v1.1.0
github.com/onsi/ginkgo/v2 v2.11.0
- github.com/onsi/gomega v1.27.8
+ github.com/onsi/gomega v1.27.10
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.16.0
github.com/rs/zerolog v1.29.1
@@ -77,7 +77,7 @@ require (
github.com/golang/snappy v0.0.3
github.com/google/btree v1.1.2 // indirect
github.com/google/flatbuffers v1.12.1 // indirect
- github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8 // indirect
+ github.com/google/pprof v0.0.0-20230728192033-2ba5b33183c6 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
@@ -140,7 +140,7 @@ require (
golang.org/x/sys v0.10.0 // indirect
golang.org/x/text v0.11.0 // indirect
golang.org/x/time v0.3.0 // indirect
- golang.org/x/tools v0.11.0 // indirect
+ golang.org/x/tools v0.11.1 // indirect
google.golang.org/genproto v0.0.0-20230706204954-ccb25ca9f130 //
indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
diff --git a/go.sum b/go.sum
index cd1e625b..eb7ac7e6 100644
--- a/go.sum
+++ b/go.sum
@@ -222,8 +222,8 @@ github.com/google/pprof
v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hf
github.com/google/pprof v0.0.0-20201023163331-3e6fc7fc9c4c/go.mod
h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod
h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20201218002935-b9804c9f04c2/go.mod
h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
-github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8
h1:n6vlPhxsA+BW/XsS5+uqi7GyzaLa5MH7qlSLBZtRdiA=
-github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8/go.mod
h1:Jh3hGz2jkYak8qXPD19ryItVnUgpgeqzdkY/D0EaeuA=
+github.com/google/pprof v0.0.0-20230728192033-2ba5b33183c6
h1:ZgoomqkdjGbQ3+qQXCkvYMCDvGDNg2k5JJDjjdTB6jY=
+github.com/google/pprof v0.0.0-20230728192033-2ba5b33183c6/go.mod
h1:Jh3hGz2jkYak8qXPD19ryItVnUgpgeqzdkY/D0EaeuA=
github.com/google/renameio v0.1.0/go.mod
h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.2/go.mod
h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
@@ -311,8 +311,8 @@ github.com/oklog/run v1.1.0
h1:GEenZ1cK0+q0+wsJew9qUg/DyD8k3JzYsZAi5gYi2mA=
github.com/oklog/run v1.1.0/go.mod
h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU=
github.com/onsi/ginkgo/v2 v2.11.0
h1:WgqUCUt/lT6yXoQ8Wef0fsNn5cAuMK7+KT9UFRz2tcU=
github.com/onsi/ginkgo/v2 v2.11.0/go.mod
h1:ZhrRA5XmEE3x3rhlzamx/JJvujdZoJ2uvgI7kR0iZvM=
-github.com/onsi/gomega v1.27.8 h1:gegWiwZjBsf2DgiSbf5hpokZ98JVDMcWkUiigk6/KXc=
-github.com/onsi/gomega v1.27.8/go.mod
h1:2J8vzI/s+2shY9XHRApDkdgPo1TKT7P2u6fXeJKFnNQ=
+github.com/onsi/gomega v1.27.10 h1:naR28SdDFlqrG6kScpT8VWpu1xWY5nJRCF3XaYyBjhI=
+github.com/onsi/gomega v1.27.10/go.mod
h1:RsS8tutOdbdgzbPtzzATp12yT7kM5I5aElG3evPbQ0M=
github.com/opentracing/opentracing-go v1.1.0/go.mod
h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/pelletier/go-toml v1.2.0/go.mod
h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pelletier/go-toml/v2 v2.0.8
h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZR9tGQ=
@@ -707,8 +707,8 @@ golang.org/x/tools
v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4f
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod
h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20210108195828-e2f9c7f1fc8e/go.mod
h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.0/go.mod
h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
-golang.org/x/tools v0.11.0 h1:EMCa6U9S2LtZXLAMoWiR/R8dAQFRqbAitmbJ2UKhoi8=
-golang.org/x/tools v0.11.0/go.mod
h1:anzJrxPjNtfgiYQYirP2CPGzGLxrH2u2QBhn6Bf3qY8=
+golang.org/x/tools v0.11.1 h1:ojD5zOW8+7dOGzdnNgersm8aPfcDjhMp12UfG93NIMc=
+golang.org/x/tools v0.11.1/go.mod
h1:anzJrxPjNtfgiYQYirP2CPGzGLxrH2u2QBhn6Bf3qY8=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
diff --git a/pkg/schema/metadata.go b/pkg/schema/metadata.go
index 31f23f57..6d20cd30 100644
--- a/pkg/schema/metadata.go
+++ b/pkg/schema/metadata.go
@@ -26,20 +26,15 @@ import (
"sync/atomic"
"time"
- "github.com/onsi/ginkgo/v2"
"github.com/pkg/errors"
"go.uber.org/multierr"
- "google.golang.org/protobuf/types/known/timestamppb"
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
- "github.com/apache/skywalking-banyandb/banyand/discovery"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
- "github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/logger"
- "github.com/apache/skywalking-banyandb/pkg/partition"
"github.com/apache/skywalking-banyandb/pkg/run"
)
@@ -95,7 +90,6 @@ type Resource interface {
GetIndexRules() []*databasev1.IndexRule
GetTopN() []*databasev1.TopNAggregation
MaxObservedModRevision() int64
- EntityLocator() partition.EntityLocator
ResourceSchema
io.Closer
}
@@ -114,8 +108,8 @@ type Repository interface {
StoreGroup(groupMeta *commonv1.Metadata) (*group, error)
LoadGroup(name string) (Group, bool)
LoadResource(metadata *commonv1.Metadata) (Resource, bool)
- NotifyAll() (err error)
Close()
+ StopCh() <-chan struct{}
}
const defaultWorkerNum = 10
@@ -124,38 +118,36 @@ var _ Repository = (*schemaRepo)(nil)
type schemaRepo struct {
metadata metadata.Repo
- repo discovery.ServiceRepo
resourceSupplier ResourceSupplier
l *logger.Logger
data map[string]*group
+ workerCloser *run.Closer
closer *run.Closer
eventCh chan MetadataEvent
- shardTopic bus.Topic
- entityTopic bus.Topic
workerNum int
sync.RWMutex
}
+// StopCh implements Repository.
+func (sr *schemaRepo) StopCh() <-chan struct{} {
+ return sr.closer.CloseNotify()
+}
+
// NewRepository return a new Repository.
func NewRepository(
metadata metadata.Repo,
- repo discovery.ServiceRepo,
l *logger.Logger,
resourceSupplier ResourceSupplier,
- shardTopic bus.Topic,
- entityTopic bus.Topic,
) Repository {
return &schemaRepo{
metadata: metadata,
- repo: repo,
l: l,
resourceSupplier: resourceSupplier,
- shardTopic: shardTopic,
- entityTopic: entityTopic,
data: make(map[string]*group),
eventCh: make(chan MetadataEvent, defaultWorkerNum),
workerNum: defaultWorkerNum,
- closer: run.NewCloser(defaultWorkerNum),
+ workerCloser: run.NewCloser(defaultWorkerNum),
+ closer: run.NewCloser(1),
}
}
@@ -167,7 +159,7 @@ func (sr *schemaRepo) Watcher() {
for i := 0; i < sr.workerNum; i++ {
go func() {
defer func() {
- sr.closer.Done()
+ sr.workerCloser.Done()
if err := recover(); err != nil {
sr.l.Warn().Interface("err",
err).Msg("watching the events")
}
@@ -202,11 +194,11 @@ func (sr *schemaRepo) Watcher() {
sr.l.Err(err).Interface("event", evt).Msg("fail to handle the metadata event.
retry...")
select {
case sr.eventCh <- evt:
- case <-sr.closer.CloseNotify():
+ case
<-sr.workerCloser.CloseNotify():
return
}
}
- case <-sr.closer.CloseNotify():
+ case <-sr.workerCloser.CloseNotify():
return
}
}
@@ -232,9 +224,9 @@ func (sr *schemaRepo) StoreGroup(groupMeta
*commonv1.Metadata) (*group, error) {
if err != nil {
return nil, err
}
- g = newGroup(groupSchema, sr.repo, sr.metadata, db, sr.l,
sr.resourceSupplier, sr.entityTopic)
+ g = newGroup(groupSchema, sr.metadata, db, sr.l,
sr.resourceSupplier)
sr.data[name] = g
- return g, sr.notify(groupSchema, databasev1.Action_ACTION_PUT)
+ return g, nil
}
prevGroupSchema := g.GetSchema()
if groupSchema.GetMetadata().GetModRevision() <=
prevGroupSchema.Metadata.ModRevision {
@@ -243,20 +235,12 @@ func (sr *schemaRepo) StoreGroup(groupMeta
*commonv1.Metadata) (*group, error) {
sr.l.Info().Str("group", name).Msg("closing the previous tsdb")
db := g.SupplyTSDB()
db.Close()
- err = sr.notify(prevGroupSchema, databasev1.Action_ACTION_DELETE)
- if err != nil {
- return nil, err
- }
sr.l.Info().Str("group", name).Msg("creating a new tsdb")
newDB, err := sr.resourceSupplier.OpenDB(groupSchema)
if err != nil {
return nil, err
}
g.setDB(newDB)
- err = sr.notify(groupSchema, databasev1.Action_ACTION_PUT)
- if err != nil {
- return nil, err
- }
g.groupSchema.Store(groupSchema)
return g, nil
}
@@ -274,7 +258,6 @@ func (sr *schemaRepo) deleteGroup(groupMeta
*commonv1.Metadata) error {
if err != nil {
return err
}
- _ = sr.notify(g.GetSchema(), databasev1.Action_ACTION_DELETE)
delete(sr.data, name)
return nil
}
@@ -324,61 +307,13 @@ func (sr *schemaRepo) deleteResource(metadata
*commonv1.Metadata) error {
return g.(*group).deleteResource(metadata)
}
-func (sr *schemaRepo) notify(groupSchema *commonv1.Group, action
databasev1.Action) (err error) {
- now := time.Now()
- nowPb := timestamppb.New(now)
- shardNum := groupSchema.GetResourceOpts().GetShardNum()
- for i := 0; i < int(shardNum); i++ {
- _, errInternal := sr.repo.Publish(sr.shardTopic,
bus.NewMessage(bus.MessageID(now.UnixNano()), &databasev1.ShardEvent{
- Shard: &databasev1.Shard{
- Id: uint64(i),
- Total: shardNum,
- Metadata: &commonv1.Metadata{
- Name:
groupSchema.GetMetadata().GetName(),
- },
- Node: &databasev1.Node{
- Id: sr.repo.NodeID(),
- CreatedAt: nowPb,
- UpdatedAt: nowPb,
- Addr: "localhost",
- },
- UpdatedAt: nowPb,
- CreatedAt: nowPb,
- },
- Time: nowPb,
- Action: action,
- }))
- if errors.Is(errInternal, bus.ErrTopicNotExist) {
- return nil
- }
- if errInternal != nil {
- err = multierr.Append(err, errInternal)
- }
- }
- return err
-}
-
-func (sr *schemaRepo) NotifyAll() (err error) {
- sr.RLock()
- defer sr.RUnlock()
- for _, g := range sr.data {
- err = multierr.Append(err, sr.notify(g.GetSchema(),
databasev1.Action_ACTION_PUT))
- g.mapMutex.RLock()
- for _, s := range g.schemaMap {
- err = multierr.Append(err, g.notify(s,
databasev1.Action_ACTION_PUT))
- }
- g.mapMutex.RUnlock()
- }
- return err
-}
-
func (sr *schemaRepo) Close() {
defer func() {
if err := recover(); err != nil {
sr.l.Warn().Interface("err", err).Msg("closing
resource")
}
}()
- sr.closer.CloseThenWait()
+ sr.workerCloser.CloseThenWait()
sr.RLock()
defer sr.RUnlock()
@@ -388,39 +323,35 @@ func (sr *schemaRepo) Close() {
sr.l.Err(err).RawJSON("group",
logger.Proto(g.GetSchema().Metadata)).Msg("closing")
}
}
+ sr.closer.Done()
+ sr.closer.CloseThenWait()
}
var _ Group = (*group)(nil)
type group struct {
resourceSupplier ResourceSupplier
- repo discovery.ServiceRepo
metadata metadata.Repo
db atomic.Value
groupSchema atomic.Pointer[commonv1.Group]
l *logger.Logger
schemaMap map[string]Resource
- entityTopic bus.Topic
mapMutex sync.RWMutex
}
func newGroup(
groupSchema *commonv1.Group,
- repo discovery.ServiceRepo,
metadata metadata.Repo,
db tsdb.Database,
l *logger.Logger,
resourceSupplier ResourceSupplier,
- entityTopic bus.Topic,
) *group {
g := &group{
groupSchema: atomic.Pointer[commonv1.Group]{},
- repo: repo,
metadata: metadata,
l: l,
schemaMap: make(map[string]Resource),
resourceSupplier: resourceSupplier,
- entityTopic: entityTopic,
}
g.groupSchema.Store(groupSchema)
g.db.Store(db)
@@ -492,9 +423,6 @@ func (g *group) StoreResource(resourceSchema
ResourceSchema) (Resource, error) {
if errTS != nil {
return nil, errTS
}
- if err := g.notify(sm, databasev1.Action_ACTION_PUT); err != nil {
- return nil, err
- }
g.schemaMap[key] = sm
if preResource != nil {
_ = preResource.Close()
@@ -510,9 +438,6 @@ func (g *group) deleteResource(metadata *commonv1.Metadata)
error {
if preResource == nil {
return nil
}
- if err := g.notify(preResource, databasev1.Action_ACTION_DELETE); err
!= nil {
- return err
- }
delete(g.schemaMap, key)
_ = preResource.Close()
return nil
@@ -528,30 +453,6 @@ func (g *group) LoadResource(name string) (Resource, bool)
{
return s, true
}
-func (g *group) notify(resource Resource, action databasev1.Action) error {
- defer ginkgo.GinkgoRecover()
- now := time.Now()
- nowPb := timestamppb.New(now)
- entityLocator := resource.EntityLocator()
- locator := make([]*databasev1.EntityEvent_TagLocator, 0,
len(entityLocator))
- for _, tagLocator := range entityLocator {
- locator = append(locator, &databasev1.EntityEvent_TagLocator{
- FamilyOffset: uint32(tagLocator.FamilyOffset),
- TagOffset: uint32(tagLocator.TagOffset),
- })
- }
- _, err := g.repo.Publish(g.entityTopic,
bus.NewMessage(bus.MessageID(now.UnixNano()), &databasev1.EntityEvent{
- Subject: resource.GetMetadata(),
- EntityLocator: locator,
- Time: nowPb,
- Action: action,
- }))
- if errors.Is(err, bus.ErrTopicNotExist) {
- return nil
- }
- return err
-}
-
func (g *group) close() (err error) {
g.mapMutex.RLock()
for _, s := range g.schemaMap {
diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go
index 7b654471..18070973 100644
--- a/pkg/test/setup/setup.go
+++ b/pkg/test/setup/setup.go
@@ -24,7 +24,6 @@ import (
"github.com/onsi/gomega"
- "github.com/apache/skywalking-banyandb/banyand/discovery"
"github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
"github.com/apache/skywalking-banyandb/banyand/liaison/http"
"github.com/apache/skywalking-banyandb/banyand/measure"
@@ -77,37 +76,37 @@ func CommonWithSchemaLoaders(schemaLoaders []SchemaLoader,
flags ...string) (str
}
func modules(schemaLoaders []SchemaLoader, flags []string) func() {
- // Init `Discovery` module
- repo, err := discovery.NewServiceRepo(context.Background())
- gomega.Expect(err).NotTo(gomega.HaveOccurred())
// Init `Queue` module
- pipeline, err := queue.NewQueue(context.TODO(), repo)
+ pipeline, err := queue.NewQueue(context.TODO())
gomega.Expect(err).NotTo(gomega.HaveOccurred())
// Init `Metadata` module
metaSvc, err := metadata.NewService(context.TODO())
gomega.Expect(err).NotTo(gomega.HaveOccurred())
// Init `Stream` module
- streamSvc, err := stream.NewService(context.TODO(), metaSvc, repo,
pipeline)
+ streamSvc, err := stream.NewService(context.TODO(), metaSvc, pipeline)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
// Init `Measure` module
- measureSvc, err := measure.NewService(context.TODO(), metaSvc, repo,
pipeline)
+ measureSvc, err := measure.NewService(context.TODO(), metaSvc, pipeline)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
// Init `Query` module
q, err := query.NewService(context.TODO(), streamSvc, measureSvc,
metaSvc, pipeline)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
- tcp := grpc.NewServer(context.TODO(), pipeline, repo, metaSvc)
+ tcp := grpc.NewServer(context.TODO(), pipeline, metaSvc)
httpServer := http.NewService()
units := []run.Unit{
- repo,
pipeline,
metaSvc,
+ streamSvc,
+ measureSvc,
+ q,
+ tcp,
+ httpServer,
}
for _, sl := range schemaLoaders {
sl.SetMeta(metaSvc)
units = append(units, sl)
}
- units = append(units, streamSvc, measureSvc, q, tcp, httpServer)
return test.SetupModules(
flags,