This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 90c441d Add measure module (#86)
90c441d is described below
commit 90c441d9b5468b5bbd1ea1b6231b3e4e0755b9c3
Author: Gao Hongtao <[email protected]>
AuthorDate: Fri Feb 25 22:21:57 2022 +0800
Add measure module (#86)
---
banyand/{stream/stream.go => measure/measure.go} | 47 ++---
.../stream_query.go => measure/measure_query.go} | 117 +++--------
banyand/measure/measure_query_test.go | 104 ++++++++++
banyand/measure/measure_suite_test.go | 120 +++++++++++
banyand/measure/measure_write.go | 227 +++++++++++++++++++++
banyand/measure/measure_write_test.go | 79 +++++++
banyand/measure/metadata.go | 207 +++++++++++++++++++
banyand/{stream => measure}/metadata_test.go | 65 +++---
banyand/measure/service.go | 156 ++++++++++++++
banyand/measure/testdata/query_data.json | 107 ++++++++++
banyand/measure/testdata/write_data.json | 107 ++++++++++
banyand/stream/metadata_test.go | 11 +-
banyand/stream/stream.go | 13 +-
banyand/stream/stream_query.go | 71 +------
banyand/stream/stream_query_test.go | 75 +++++--
banyand/stream/stream_suite_test.go | 47 +----
banyand/tsdb/scope.go | 85 ++++++++
pkg/test/matcher.go | 80 ++++++++
pkg/test/measure/etcd.go | 17 +-
pkg/test/measure/testdata/group.json | 27 +++
pkg/test/measure/testdata/measure.json | 24 ---
21 files changed, 1460 insertions(+), 326 deletions(-)
diff --git a/banyand/stream/stream.go b/banyand/measure/measure.go
similarity index 69%
copy from banyand/stream/stream.go
copy to banyand/measure/measure.go
index 2856938..481197d 100644
--- a/banyand/stream/stream.go
+++ b/banyand/measure/measure.go
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package stream
+package measure
import (
"context"
@@ -26,21 +26,18 @@ import (
"github.com/apache/skywalking-banyandb/banyand/tsdb/index"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/partition"
- "github.com/apache/skywalking-banyandb/pkg/schema"
+ pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
)
// a chunk is 1MB
const chunkSize = 1 << 20
-var _ schema.Resource = (*stream)(nil)
-
-type stream struct {
+type measure struct {
name string
group string
shardNum uint32
l *logger.Logger
- // schema is the reference to the spec of the stream
- schema *databasev1.Stream
+ schema *databasev1.Measure
// maxObservedModRevision is the max observed revision of index rules
in the spec
maxObservedModRevision int64
db tsdb.Supplier
@@ -49,49 +46,43 @@ type stream struct {
indexWriter *index.Writer
}
-func (s *stream) GetMetadata() *commonv1.Metadata {
+func (s *measure) GetSchema() *databasev1.Measure {
+ return s.schema
+}
+
+func (s *measure) GetMetadata() *commonv1.Metadata {
return s.schema.Metadata
}
-func (s *stream) GetIndexRules() []*databasev1.IndexRule {
+func (s *measure) GetIndexRules() []*databasev1.IndexRule {
return s.indexRules
}
-func (s *stream) MaxObservedModRevision() int64 {
+func (s *measure) MaxObservedModRevision() int64 {
return s.maxObservedModRevision
}
-func (s *stream) EntityLocator() partition.EntityLocator {
+func (s *measure) EntityLocator() partition.EntityLocator {
return s.entityLocator
}
-func (s *stream) Close() error {
+func (s *measure) Close() error {
return s.indexWriter.Close()
}
-func parseMaxModRevision(indexRules []*databasev1.IndexRule)
(maxRevisionForIdxRules int64) {
- maxRevisionForIdxRules = int64(0)
- for _, idxRule := range indexRules {
- if idxRule.GetMetadata().GetModRevision() >
maxRevisionForIdxRules {
- maxRevisionForIdxRules =
idxRule.GetMetadata().GetModRevision()
- }
- }
- return
-}
-
-func (s *stream) parseSpec() {
+func (s *measure) parseSpec() {
s.name, s.group = s.schema.GetMetadata().GetName(),
s.schema.GetMetadata().GetGroup()
s.entityLocator = partition.NewEntityLocator(s.schema.GetTagFamilies(),
s.schema.GetEntity())
- s.maxObservedModRevision = parseMaxModRevision(s.indexRules)
+ s.maxObservedModRevision = pbv1.ParseMaxModRevision(s.indexRules)
}
-type streamSpec struct {
- schema *databasev1.Stream
+type measureSpec struct {
+ schema *databasev1.Measure
indexRules []*databasev1.IndexRule
}
-func openStream(shardNum uint32, db tsdb.Supplier, spec streamSpec, l
*logger.Logger) (*stream, error) {
- sm := &stream{
+func openMeasure(shardNum uint32, db tsdb.Supplier, spec measureSpec, l
*logger.Logger) (*measure, error) {
+ sm := &measure{
shardNum: shardNum,
schema: spec.schema,
indexRules: spec.indexRules,
diff --git a/banyand/stream/stream_query.go b/banyand/measure/measure_query.go
similarity index 53%
copy from banyand/stream/stream_query.go
copy to banyand/measure/measure_query.go
index 2896f0d..4d482b2 100644
--- a/banyand/stream/stream_query.go
+++ b/banyand/measure/measure_query.go
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package stream
+package measure
import (
"io"
@@ -26,10 +26,11 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
- streamv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/pkg/partition"
+ resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
)
var (
@@ -37,25 +38,27 @@ var (
)
type Query interface {
- Stream(stream *commonv1.Metadata) (Stream, error)
+ LoadGroup(name string) (resourceSchema.Group, bool)
+ Measure(measure *commonv1.Metadata) (Measure, error)
}
-type Stream interface {
+type Measure interface {
io.Closer
- Write(value *streamv1.ElementValue) error
+ Write(value *measurev1.DataPointValue) error
Shards(entity tsdb.Entity) ([]tsdb.Shard, error)
Shard(id common.ShardID) (tsdb.Shard, error)
ParseTagFamily(family string, item tsdb.Item) (*modelv1.TagFamily,
error)
- ParseElementID(item tsdb.Item) (string, error)
+ ParseField(name string, item tsdb.Item) (*measurev1.DataPoint_Field,
error)
+ GetSchema() *databasev1.Measure
}
-var _ Stream = (*stream)(nil)
+var _ Measure = (*measure)(nil)
-func (s *stream) Shards(entity tsdb.Entity) ([]tsdb.Shard, error) {
+func (s *measure) Shards(entity tsdb.Entity) ([]tsdb.Shard, error) {
wrap := func(shards []tsdb.Shard) []tsdb.Shard {
result := make([]tsdb.Shard, len(shards))
for i := 0; i < len(shards); i++ {
- result[i] = newShardDelegate(tsdb.Entry(s.name),
shards[i])
+ result[i] = tsdb.NewScopedShard(tsdb.Entry(s.name),
shards[i])
}
return result
}
@@ -76,21 +79,21 @@ func (s *stream) Shards(entity tsdb.Entity) ([]tsdb.Shard,
error) {
if err != nil {
return nil, err
}
- return []tsdb.Shard{newShardDelegate(tsdb.Entry(s.name), shard)}, nil
+ return []tsdb.Shard{tsdb.NewScopedShard(tsdb.Entry(s.name), shard)}, nil
}
-func (s *stream) Shard(id common.ShardID) (tsdb.Shard, error) {
+func (s *measure) Shard(id common.ShardID) (tsdb.Shard, error) {
shard, err := s.db.SupplyTSDB().Shard(id)
if err != nil {
return nil, err
}
- return newShardDelegate(tsdb.Entry(s.name), shard), nil
+ return tsdb.NewScopedShard(tsdb.Entry(s.name), shard), nil
}
-func (s *stream) ParseTagFamily(family string, item tsdb.Item)
(*modelv1.TagFamily, error) {
- familyRawBytes, err := item.Family(family)
+func (s *measure) ParseTagFamily(family string, item tsdb.Item)
(*modelv1.TagFamily, error) {
+ familyRawBytes, err := item.Family(string(familyIdentity(family,
TagFlag)))
if err != nil {
- return nil, errors.Wrapf(err, "parse family %s", family)
+ return nil, err
}
tagFamily := &modelv1.TagFamilyForWrite{}
err = proto.Unmarshal(familyRawBytes, tagFamily)
@@ -121,75 +124,21 @@ func (s *stream) ParseTagFamily(family string, item
tsdb.Item) (*modelv1.TagFami
}, err
}
-func (s *stream) ParseElementID(item tsdb.Item) (string, error) {
- rawBytes, err := item.Val()
- if err != nil {
- return "", err
- }
- return string(rawBytes), nil
-}
-
-var _ tsdb.Shard = (*shardDelegate)(nil)
-
-type shardDelegate struct {
- scope tsdb.Entry
- delegated tsdb.Shard
-}
-
-func newShardDelegate(scope tsdb.Entry, delegated tsdb.Shard) tsdb.Shard {
- return &shardDelegate{
- scope: scope,
- delegated: delegated,
+func (s *measure) ParseField(name string, item tsdb.Item)
(*measurev1.DataPoint_Field, error) {
+ var fieldSpec *databasev1.FieldSpec
+ for _, spec := range s.schema.GetFields() {
+ if spec.GetName() == name {
+ fieldSpec = spec
+ break
+ }
}
-}
-
-func (sd *shardDelegate) Close() error {
- // the delegate can't close the underlying shard
- return nil
-}
-
-func (sd *shardDelegate) ID() common.ShardID {
- return sd.delegated.ID()
-}
-
-func (sd *shardDelegate) Series() tsdb.SeriesDatabase {
- return &seriesDatabaseDelegate{
- scope: sd.scope,
- delegated: sd.delegated.Series(),
+ bytes, err := item.Family(string(familyIdentity(name,
encoderFieldFlag(fieldSpec))))
+ if err != nil {
+ return nil, err
}
-}
-
-func (sd *shardDelegate) Index() tsdb.IndexDatabase {
- return sd.delegated.Index()
-}
-
-func (sd *shardDelegate) State() tsdb.ShardState {
- return sd.delegated.State()
-}
-
-var _ tsdb.SeriesDatabase = (*seriesDatabaseDelegate)(nil)
-
-type seriesDatabaseDelegate struct {
- scope tsdb.Entry
- delegated tsdb.SeriesDatabase
-}
-
-func (sdd *seriesDatabaseDelegate) Close() error {
- return nil
-}
-
-func (sdd *seriesDatabaseDelegate) GetByHashKey(key []byte) (tsdb.Series,
error) {
- return sdd.delegated.GetByHashKey(key)
-}
-
-func (sdd *seriesDatabaseDelegate) GetByID(id common.SeriesID) (tsdb.Series,
error) {
- return sdd.delegated.GetByID(id)
-}
-
-func (sdd *seriesDatabaseDelegate) Get(entity tsdb.Entity) (tsdb.Series,
error) {
- return sdd.delegated.Get(entity.Prepend(sdd.scope))
-}
-
-func (sdd *seriesDatabaseDelegate) List(path tsdb.Path) (tsdb.SeriesList,
error) {
- return sdd.delegated.List(path.Prepand(sdd.scope))
+ fieldValue := decodeFieldValue(bytes, fieldSpec)
+ return &measurev1.DataPoint_Field{
+ Name: name,
+ Value: fieldValue,
+ }, err
}
diff --git a/banyand/measure/measure_query_test.go
b/banyand/measure/measure_query_test.go
new file mode 100644
index 0000000..3a9828d
--- /dev/null
+++ b/banyand/measure/measure_query_test.go
@@ -0,0 +1,104 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreementmeasure. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure_test
+
+import (
+ "time"
+
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ "github.com/apache/skywalking-banyandb/banyand/measure"
+ "github.com/apache/skywalking-banyandb/banyand/tsdb"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+var _ = Describe("Write", func() {
+ var svcs *services
+ var deferFn func()
+ var measure measure.Measure
+
+ BeforeEach(func() {
+ svcs, deferFn = setUp()
+ var err error
+ measure, err = svcs.measure.Measure(&commonv1.Metadata{
+ Name: "cpm",
+ Group: "default",
+ })
+ Expect(err).ShouldNot(HaveOccurred())
+ })
+ AfterEach(func() {
+ deferFn()
+ })
+ It("queries data", func() {
+ baseTime := writeData("query_data.json", measure)
+ shard, err := measure.Shard(0)
+ Expect(err).ShouldNot(HaveOccurred())
+ series, err := shard.Series().Get(tsdb.Entity{tsdb.Entry("1")})
+ Expect(err).ShouldNot(HaveOccurred())
+ seriesSpan, err :=
series.Span(timestamp.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour))
+ defer func(seriesSpan tsdb.SeriesSpan) {
+ _ = seriesSpan.Close()
+ }(seriesSpan)
+ Expect(err).ShouldNot(HaveOccurred())
+ seeker, err :=
seriesSpan.SeekerBuilder().OrderByTime(modelv1.Sort_SORT_DESC).Build()
+ Expect(err).ShouldNot(HaveOccurred())
+ iter, err := seeker.Seek()
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(len(iter)).To(Equal(1))
+ defer func(iterator tsdb.Iterator) {
+ _ = iterator.Close()
+ }(iter[0])
+ i := 0
+ expectedFields := [][]int64{{150, 300, 5}, {200, 50, 4}, {100,
100, 1}}
+ for ; iter[0].Next(); i++ {
+ item := iter[0].Val()
+ tagFamily, err := measure.ParseTagFamily("default",
item)
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(len(tagFamily.Tags)).To(Equal(2))
+ summation, err := measure.ParseField("summation", item)
+ Expect(err).ShouldNot(HaveOccurred())
+
Expect(summation.GetValue().GetInt().Value).To(Equal(expectedFields[i][0]))
+ count, err := measure.ParseField("count", item)
+ Expect(err).ShouldNot(HaveOccurred())
+
Expect(count.GetValue().GetInt().Value).To(Equal(expectedFields[i][1]))
+ value, err := measure.ParseField("value", item)
+ Expect(err).ShouldNot(HaveOccurred())
+
Expect(value.GetValue().GetInt().Value).To(Equal(expectedFields[i][2]))
+ }
+ })
+})
diff --git a/banyand/measure/measure_suite_test.go
b/banyand/measure/measure_suite_test.go
new file mode 100644
index 0000000..c8c80fb
--- /dev/null
+++ b/banyand/measure/measure_suite_test.go
@@ -0,0 +1,120 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package measure_test
+
+import (
+ "context"
+ "testing"
+
+ "github.com/golang/mock/gomock"
+ "github.com/onsi/ginkgo/v2"
+ "github.com/onsi/gomega"
+
+ "github.com/apache/skywalking-banyandb/api/event"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/banyand/discovery"
+ "github.com/apache/skywalking-banyandb/banyand/measure"
+ "github.com/apache/skywalking-banyandb/banyand/metadata"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/test"
+ testmeasure "github.com/apache/skywalking-banyandb/pkg/test/measure"
+)
+
+func TestMeasure(t *testing.T) {
+ gomega.RegisterFailHandler(ginkgo.Fail)
+ ginkgo.RunSpecs(t, "Measure Suite")
+}
+
+// BeforeSuite - Init logger
+var _ = ginkgo.BeforeSuite(func() {
+ gomega.Expect(logger.Init(logger.Logging{
+ Env: "dev",
+ Level: "info",
+ })).To(gomega.Succeed())
+})
+
+// service to preload measure
+type preloadMeasureService struct {
+ metaSvc metadata.Service
+}
+
+func (p *preloadMeasureService) Name() string {
+ return "preload-measure"
+}
+
+func (p *preloadMeasureService) PreRun() error {
+ return testmeasure.PreloadSchema(p.metaSvc.SchemaRegistry())
+}
+
+type services struct {
+ measure measure.Service
+ metadataService metadata.Service
+ repo *discovery.MockServiceRepo
+ pipeline queue.Queue
+}
+
+func setUp() (*services, func()) {
+ ctrl := gomock.NewController(ginkgo.GinkgoT())
+ gomega.Expect(ctrl).ShouldNot(gomega.BeNil())
+ // Init Discovery
+ repo := discovery.NewMockServiceRepo(ctrl)
+ repo.EXPECT().NodeID().AnyTimes()
+ // Both PreRun and Serve phases send events
+ repo.EXPECT().Publish(event.MeasureTopicEntityEvent,
test.NewEntityEventMatcher(databasev1.Action_ACTION_PUT)).Times(2 * 1)
+ repo.EXPECT().Publish(event.MeasureTopicShardEvent,
test.NewShardEventMatcher(databasev1.Action_ACTION_PUT)).Times(2 * 2)
+
+ // Init Pipeline
+ pipeline, err := queue.NewQueue(context.TODO(), repo)
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+ // Init Metadata Service
+ metadataService, err := metadata.NewService(context.TODO())
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+ // Init Measure Service
+ measureService, err := measure.NewService(context.TODO(),
metadataService, repo, pipeline)
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ preloadMeasureSvc := &preloadMeasureService{metaSvc: metadataService}
+ var flags []string
+ metaPath, metaDeferFunc, err := test.NewSpace()
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ flags = append(flags, "--metadata-root-path="+metaPath)
+ rootPath, deferFunc, err := test.NewSpace()
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ flags = append(flags, "--root-path="+rootPath)
+ moduleDeferFunc := test.SetUpModules(
+ flags,
+ repo,
+ pipeline,
+ metadataService,
+ preloadMeasureSvc,
+ measureService,
+ )
+ return &services{
+ measure: measureService,
+ metadataService: metadataService,
+ repo: repo,
+ pipeline: pipeline,
+ }, func() {
+ moduleDeferFunc()
+ metaDeferFunc()
+ deferFunc()
+ }
+}
diff --git a/banyand/measure/measure_write.go b/banyand/measure/measure_write.go
new file mode 100644
index 0000000..5b777e7
--- /dev/null
+++ b/banyand/measure/measure_write.go
@@ -0,0 +1,227 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+ "bytes"
+
+ "github.com/pkg/errors"
+ "google.golang.org/protobuf/proto"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ "github.com/apache/skywalking-banyandb/banyand/tsdb"
+ "github.com/apache/skywalking-banyandb/banyand/tsdb/index"
+ "github.com/apache/skywalking-banyandb/pkg/bus"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+var (
+ ErrMalformedElement = errors.New("element is malformed")
+)
+
+const (
+ TagFlag byte = iota
+)
+
+func (s *measure) Write(value *measurev1.DataPointValue) error {
+ entity, shardID, err := s.entityLocator.Locate(s.name,
value.GetTagFamilies(), s.shardNum)
+ if err != nil {
+ return err
+ }
+ waitCh := make(chan struct{})
+ err = s.write(shardID, tsdb.HashEntity(entity), value, func() {
+ close(waitCh)
+ })
+ if err != nil {
+ close(waitCh)
+ return err
+ }
+ <-waitCh
+ return nil
+}
+
+func (s *measure) write(shardID common.ShardID, seriesHashKey []byte, value
*measurev1.DataPointValue, cb index.CallbackFn) error {
+ sm := s.schema
+ fLen := len(value.GetTagFamilies())
+ if fLen < 1 {
+ return errors.Wrap(ErrMalformedElement, "no tag family")
+ }
+ if fLen > len(sm.TagFamilies) {
+ return errors.Wrap(ErrMalformedElement, "tag family number is
more than expected")
+ }
+ shard, err := s.db.SupplyTSDB().Shard(shardID)
+ if err != nil {
+ return err
+ }
+ series, err := shard.Series().GetByHashKey(seriesHashKey)
+ if err != nil {
+ return err
+ }
+ t := value.GetTimestamp().AsTime()
+ wp, err := series.Span(timestamp.NewInclusiveTimeRangeDuration(t, 0))
+ if err != nil {
+ if wp != nil {
+ _ = wp.Close()
+ }
+ return err
+ }
+ writeFn := func() (tsdb.Writer, error) {
+ builder := wp.WriterBuilder().Time(t)
+ for fi, family := range value.GetTagFamilies() {
+ familySpec := sm.GetTagFamilies()[fi]
+ if len(family.GetTags()) > len(familySpec.GetTags()) {
+ return nil, errors.Wrap(ErrMalformedElement,
"tag number is more than expected")
+ }
+ for ti, tag := range family.GetTags() {
+ tagSpec := familySpec.GetTags()[ti]
+ tType, isNull := pbv1.TagValueTypeConv(tag)
+ if isNull {
+ continue
+ }
+ if tType != tagSpec.GetType() {
+ return nil,
errors.Wrapf(ErrMalformedElement, "tag %s type is unexpected",
tagSpec.GetName())
+ }
+ }
+ bb, errMarshal := proto.Marshal(family)
+ if errMarshal != nil {
+ return nil, errMarshal
+ }
+
builder.Family(familyIdentity(sm.GetTagFamilies()[fi].GetName(), TagFlag), bb)
+ }
+ if len(value.GetFields()) > len(sm.GetFields()) {
+ return nil, errors.Wrap(ErrMalformedElement, "fields
number is more than expected")
+ }
+ for fi, fieldValue := range value.GetFields() {
+ fieldSpec := sm.GetFields()[fi]
+ fType, isNull := pbv1.FieldValueTypeConv(fieldValue)
+ if isNull {
+ continue
+ }
+ if fType != fieldSpec.GetFieldType() {
+ return nil, errors.Wrapf(ErrMalformedElement,
"field %s type is unexpected", fieldSpec.GetName())
+ }
+ data := encodeFieldValue(fieldValue)
+ if data == nil {
+ continue
+ }
+
builder.Family(familyIdentity(sm.GetFields()[fi].GetName(),
encoderFieldFlag(fieldSpec)), data)
+ }
+ writer, errWrite := builder.Build()
+ if errWrite != nil {
+ return nil, errWrite
+ }
+ _, errWrite = writer.Write()
+ s.l.Debug().
+ Time("ts", t).
+ Int("ts_nano", t.Nanosecond()).
+ Interface("data", value).
+ Uint64("series_id", uint64(series.ID())).
+ Uint64("item_id", uint64(writer.ItemID().ID)).
+ Int("shard_id", int(shardID)).
+ Msg("write measure")
+ return writer, errWrite
+ }
+ writer, err := writeFn()
+ if err != nil {
+ _ = wp.Close()
+ return err
+ }
+ m := index.Message{
+ LocalWriter: writer,
+ Value: index.Value{
+ TagFamilies: value.GetTagFamilies(),
+ Timestamp: value.GetTimestamp().AsTime(),
+ },
+ BlockCloser: wp,
+ Cb: cb,
+ }
+ s.indexWriter.Write(m)
+ return err
+}
+
+type writeCallback struct {
+ l *logger.Logger
+ schemaRepo *schemaRepo
+}
+
+func setUpWriteCallback(l *logger.Logger, schemaRepo *schemaRepo)
*writeCallback {
+ wcb := &writeCallback{
+ l: l,
+ schemaRepo: schemaRepo,
+ }
+ return wcb
+}
+
+func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) {
+ writeEvent, ok := message.Data().(*measurev1.InternalWriteRequest)
+ if !ok {
+ w.l.Warn().Msg("invalid event data type")
+ return
+ }
+
+ stm, ok :=
w.schemaRepo.loadMeasure(writeEvent.GetRequest().GetMetadata())
+ if !ok {
+ w.l.Warn().Msg("cannot find measure definition")
+ return
+ }
+ err := stm.write(common.ShardID(writeEvent.GetShardId()),
writeEvent.GetSeriesHash(), writeEvent.GetRequest().GetDataPoint(), nil)
+ if err != nil {
+ w.l.Debug().Err(err).Msg("fail to write entity")
+ }
+ return
+}
+
+func familyIdentity(name string, flag byte) []byte {
+ return bytes.Join([][]byte{[]byte(name), {flag}}, nil)
+}
+
+func encodeFieldValue(fieldValue *modelv1.FieldValue) []byte {
+ switch fieldValue.GetValue().(type) {
+ case *modelv1.FieldValue_Int:
+ return convert.Int64ToBytes(fieldValue.GetInt().GetValue())
+ case *modelv1.FieldValue_Str:
+ return []byte(fieldValue.GetStr().Value)
+ case *modelv1.FieldValue_BinaryData:
+ return fieldValue.GetBinaryData()
+ }
+ return nil
+}
+
+func decodeFieldValue(fieldValue []byte, fieldSpec *databasev1.FieldSpec)
*modelv1.FieldValue {
+ switch fieldSpec.GetFieldType() {
+ case databasev1.FieldType_FIELD_TYPE_STRING:
+ return &modelv1.FieldValue{Value: &modelv1.FieldValue_Str{Str:
&modelv1.Str{Value: string(fieldValue)}}}
+ case databasev1.FieldType_FIELD_TYPE_INT:
+ return &modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int:
&modelv1.Int{Value: convert.BytesToInt64(fieldValue)}}}
+ case databasev1.FieldType_FIELD_TYPE_DATA_BINARY:
+ return &modelv1.FieldValue{Value:
&modelv1.FieldValue_BinaryData{BinaryData: fieldValue}}
+ }
+ return &modelv1.FieldValue{Value: &modelv1.FieldValue_Null{}}
+}
+
+func encoderFieldFlag(fieldSpec *databasev1.FieldSpec) byte {
+ encodingMethod := byte(fieldSpec.GetEncodingMethod().Number())
+ compressionMethod := byte(fieldSpec.GetCompressionMethod().Number())
+ return encodingMethod<<4 | compressionMethod
+}
diff --git a/banyand/measure/measure_write_test.go
b/banyand/measure/measure_write_test.go
new file mode 100644
index 0000000..eceaaff
--- /dev/null
+++ b/banyand/measure/measure_write_test.go
@@ -0,0 +1,79 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure_test
+
+import (
+ "embed"
+ "encoding/json"
+ "time"
+
+ "github.com/golang/protobuf/jsonpb"
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+ "github.com/apache/skywalking-banyandb/banyand/measure"
+)
+
+var _ = Describe("Write", func() {
+ var svcs *services
+ var deferFn func()
+ var measure measure.Measure
+
+ BeforeEach(func() {
+ svcs, deferFn = setUp()
+ var err error
+ measure, err = svcs.measure.Measure(&commonv1.Metadata{
+ Name: "cpm",
+ Group: "default",
+ })
+ Expect(err).ShouldNot(HaveOccurred())
+ })
+ AfterEach(func() {
+ deferFn()
+ })
+ It("writes data", func() {
+ writeData("write_data.json", measure)
+ })
+})
+
+//go:embed testdata/*.json
+var dataFS embed.FS
+
+func writeData(dataFile string, measure measure.Measure) (baseTime time.Time) {
+ var templates []interface{}
+ baseTime = time.Now()
+ content, err := dataFS.ReadFile("testdata/" + dataFile)
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(json.Unmarshal(content, &templates)).ShouldNot(HaveOccurred())
+ now := time.Now()
+ for i, template := range templates {
+ rawDataPointValue, errMarshal := json.Marshal(template)
+ Expect(errMarshal).ShouldNot(HaveOccurred())
+ dataPointValue := &measurev1.DataPointValue{}
+ if dataPointValue.Timestamp == nil {
+ dataPointValue.Timestamp =
timestamppb.New(now.Add(time.Duration(i) * time.Minute))
+ }
+ Expect(jsonpb.UnmarshalString(string(rawDataPointValue),
dataPointValue)).ShouldNot(HaveOccurred())
+ errInner := measure.Write(dataPointValue)
+ Expect(errInner).ShouldNot(HaveOccurred())
+ }
+ return baseTime
+}
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
new file mode 100644
index 0000000..2a2e96e
--- /dev/null
+++ b/banyand/measure/metadata.go
@@ -0,0 +1,207 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package measure
+
+import (
+ "context"
+ "time"
+
+ "github.com/apache/skywalking-banyandb/api/event"
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/banyand/discovery"
+ "github.com/apache/skywalking-banyandb/banyand/metadata"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ "github.com/apache/skywalking-banyandb/banyand/tsdb"
+ "github.com/apache/skywalking-banyandb/pkg/encoding"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
+)
+
+type schemaRepo struct {
+ resourceSchema.Repository
+ l *logger.Logger
+ metadata metadata.Repo
+}
+
+func newSchemaRepo(path string, metadata metadata.Repo, repo
discovery.ServiceRepo, l *logger.Logger) schemaRepo {
+ return schemaRepo{
+ l: l,
+ metadata: metadata,
+ Repository: resourceSchema.NewRepository(
+ metadata,
+ repo,
+ l,
+ newSupplier(path, metadata, l),
+ event.MeasureTopicShardEvent,
+ event.MeasureTopicEntityEvent,
+ ),
+ }
+}
+
+func (sr *schemaRepo) OnAddOrUpdate(m schema.Metadata) {
+ switch m.Kind {
+ case schema.KindGroup:
+ g := m.Spec.(*commonv1.Group)
+ if g.Catalog != commonv1.Catalog_CATALOG_MEASURE {
+ return
+ }
+ sr.SendMetadataEvent(resourceSchema.MetadataEvent{
+ Typ: resourceSchema.EventAddOrUpdate,
+ Kind: resourceSchema.EventKindGroup,
+ Metadata: g.GetMetadata(),
+ })
+ case schema.KindMeasure:
+ sr.SendMetadataEvent(resourceSchema.MetadataEvent{
+ Typ: resourceSchema.EventAddOrUpdate,
+ Kind: resourceSchema.EventKindResource,
+ Metadata: m.Spec.(*databasev1.Measure).GetMetadata(),
+ })
+ case schema.KindIndexRuleBinding:
+ irb, ok := m.Spec.(*databasev1.IndexRuleBinding)
+ if !ok {
+ sr.l.Warn().Msg("fail to convert message to
IndexRuleBinding")
+ return
+ }
+ if irb.GetSubject().Catalog == commonv1.Catalog_CATALOG_MEASURE
{
+ ctx, cancel :=
context.WithTimeout(context.Background(), 5*time.Second)
+ stm, err :=
sr.metadata.MeasureRegistry().GetMeasure(ctx, &commonv1.Metadata{
+ Name: irb.GetSubject().GetName(),
+ Group: m.Group,
+ })
+ cancel()
+ if err != nil {
+ sr.l.Error().Err(err).Msg("fail to get subject")
+ return
+ }
+ sr.SendMetadataEvent(resourceSchema.MetadataEvent{
+ Typ: resourceSchema.EventAddOrUpdate,
+ Kind: resourceSchema.EventKindResource,
+ Metadata: stm.GetMetadata(),
+ })
+ }
+ case schema.KindIndexRule:
+ ctx, cancel := context.WithTimeout(context.Background(),
5*time.Second)
+ subjects, err := sr.metadata.Subjects(ctx,
m.Spec.(*databasev1.IndexRule), commonv1.Catalog_CATALOG_MEASURE)
+ cancel()
+ if err != nil {
+ sr.l.Error().Err(err).Msg("fail to get
subjects(measure)")
+ return
+ }
+ for _, sub := range subjects {
+ sr.SendMetadataEvent(resourceSchema.MetadataEvent{
+ Typ: resourceSchema.EventAddOrUpdate,
+ Kind: resourceSchema.EventKindResource,
+ Metadata:
sub.(*databasev1.Measure).GetMetadata(),
+ })
+ }
+ default:
+ }
+}
+
+func (sr *schemaRepo) OnDelete(m schema.Metadata) {
+ switch m.Kind {
+ case schema.KindGroup:
+ g := m.Spec.(*commonv1.Group)
+ if g.Catalog != commonv1.Catalog_CATALOG_MEASURE {
+ return
+ }
+ sr.SendMetadataEvent(resourceSchema.MetadataEvent{
+ Typ: resourceSchema.EventDelete,
+ Kind: resourceSchema.EventKindGroup,
+ Metadata: g.GetMetadata(),
+ })
+ case schema.KindMeasure:
+ sr.SendMetadataEvent(resourceSchema.MetadataEvent{
+ Typ: resourceSchema.EventDelete,
+ Kind: resourceSchema.EventKindResource,
+ Metadata: m.Spec.(*databasev1.Measure).GetMetadata(),
+ })
+ case schema.KindIndexRuleBinding:
+ if m.Spec.(*databasev1.IndexRuleBinding).GetSubject().Catalog
== commonv1.Catalog_CATALOG_MEASURE {
+ ctx, cancel :=
context.WithTimeout(context.Background(), 5*time.Second)
+ stm, err :=
sr.metadata.MeasureRegistry().GetMeasure(ctx, &commonv1.Metadata{
+ Name: m.Name,
+ Group: m.Group,
+ })
+ cancel()
+ if err != nil {
+ sr.l.Error().Err(err).Msg("fail to get subject")
+ return
+ }
+ sr.SendMetadataEvent(resourceSchema.MetadataEvent{
+ Typ: resourceSchema.EventDelete,
+ Kind: resourceSchema.EventKindResource,
+ Metadata: stm.GetMetadata(),
+ })
+ }
+ case schema.KindIndexRule:
+ default:
+ }
+}
+
+func (sr *schemaRepo) loadMeasure(metadata *commonv1.Metadata) (*measure,
bool) {
+ r, ok := sr.LoadResource(metadata)
+ if !ok {
+ return nil, false
+ }
+ s, ok := r.(*measure)
+ return s, ok
+}
+
+var _ resourceSchema.ResourceSupplier = (*supplier)(nil)
+
+type supplier struct {
+ path string
+ metadata metadata.Repo
+ l *logger.Logger
+}
+
+func newSupplier(path string, metadata metadata.Repo, l *logger.Logger)
*supplier {
+ return &supplier{
+ path: path,
+ metadata: metadata,
+ l: l,
+ }
+}
+
+func (s *supplier) OpenResource(shardNum uint32, db tsdb.Supplier, spec
resourceSchema.ResourceSpec) (resourceSchema.Resource, error) {
+ measureSchema := spec.Schema.(*databasev1.Measure)
+ return openMeasure(shardNum, db, measureSpec{
+ schema: measureSchema,
+ indexRules: spec.IndexRules,
+ }, s.l)
+}
+func (s *supplier) ResourceSchema(repo metadata.Repo, md *commonv1.Metadata)
(resourceSchema.ResourceSchema, error) {
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ return s.metadata.MeasureRegistry().GetMeasure(ctx, md)
+}
+
+func (s *supplier) OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error) {
+ return tsdb.OpenDatabase(
+ context.TODO(),
+ tsdb.DatabaseOpts{
+ Location: s.path,
+ ShardNum: groupSchema.ResourceOpts.ShardNum,
+ EncodingMethod: tsdb.EncodingMethod{
+ EncoderPool:
encoding.NewPlainEncoderPool(chunkSize),
+ DecoderPool:
encoding.NewPlainDecoderPool(chunkSize),
+ },
+ })
+}
diff --git a/banyand/stream/metadata_test.go b/banyand/measure/metadata_test.go
similarity index 60%
copy from banyand/stream/metadata_test.go
copy to banyand/measure/metadata_test.go
index 216bbd4..0dc8120 100644
--- a/banyand/stream/metadata_test.go
+++ b/banyand/measure/metadata_test.go
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package stream
+package measure_test
import (
"context"
@@ -27,6 +27,7 @@ import (
"github.com/apache/skywalking-banyandb/api/event"
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/pkg/test"
)
var _ = Describe("Metadata", func() {
@@ -44,24 +45,24 @@ var _ = Describe("Metadata", func() {
Context("Manage group", func() {
It("should pass smoke test", func() {
Eventually(func() bool {
- _, ok :=
svcs.stream.schemaRepo.LoadGroup("default")
+ _, ok := svcs.measure.LoadGroup("default")
return ok
}).WithTimeout(10 * time.Second).Should(BeTrue())
})
It("should close the group", func() {
- svcs.repo.EXPECT().Publish(event.StreamTopicShardEvent,
&shardEventMatcher{action: databasev1.Action_ACTION_DELETE}).Times(2)
+
svcs.repo.EXPECT().Publish(event.MeasureTopicShardEvent,
test.NewShardEventMatcher(databasev1.Action_ACTION_DELETE)).Times(2)
deleted, err :=
svcs.metadataService.GroupRegistry().DeleteGroup(context.TODO(), "default")
Expect(err).ShouldNot(HaveOccurred())
Expect(deleted).Should(BeTrue())
Eventually(func() bool {
- _, ok :=
svcs.stream.schemaRepo.LoadGroup("default")
+ _, ok := svcs.measure.LoadGroup("default")
return ok
}).WithTimeout(10 * time.Second).Should(BeFalse())
})
It("should add shards", func() {
- svcs.repo.EXPECT().Publish(event.StreamTopicShardEvent,
&shardEventMatcher{action: databasev1.Action_ACTION_DELETE}).Times(2)
- svcs.repo.EXPECT().Publish(event.StreamTopicShardEvent,
&shardEventMatcher{action: databasev1.Action_ACTION_PUT}).Times(4)
+
svcs.repo.EXPECT().Publish(event.MeasureTopicShardEvent,
test.NewShardEventMatcher(databasev1.Action_ACTION_DELETE)).Times(2)
+
svcs.repo.EXPECT().Publish(event.MeasureTopicShardEvent,
test.NewShardEventMatcher(databasev1.Action_ACTION_PUT)).Times(4)
groupSchema, err :=
svcs.metadataService.GroupRegistry().GetGroup(context.TODO(), "default")
Expect(err).ShouldNot(HaveOccurred())
Expect(groupSchema).ShouldNot(BeNil())
@@ -70,7 +71,7 @@ var _ = Describe("Metadata", func() {
Expect(svcs.metadataService.GroupRegistry().UpdateGroup(context.TODO(),
groupSchema)).Should(Succeed())
Eventually(func() bool {
- group, ok :=
svcs.stream.schemaRepo.LoadGroup("default")
+ group, ok := svcs.measure.LoadGroup("default")
if !ok {
return false
}
@@ -79,65 +80,65 @@ var _ = Describe("Metadata", func() {
})
})
- Context("Manage stream", func() {
+ Context("Manage measure", func() {
It("should pass smoke test", func() {
Eventually(func() bool {
- _, ok :=
svcs.stream.schemaRepo.loadStream(&commonv1.Metadata{
- Name: "sw",
+ _, err :=
svcs.measure.Measure(&commonv1.Metadata{
+ Name: "cpm",
Group: "default",
})
- return ok
+ return err == nil
}).WithTimeout(10 * time.Second).Should(BeTrue())
})
- It("should close the stream", func() {
-
svcs.repo.EXPECT().Publish(event.StreamTopicEntityEvent,
&entityEventMatcher{action: databasev1.Action_ACTION_DELETE}).Times(1)
- deleted, err :=
svcs.metadataService.StreamRegistry().DeleteStream(context.TODO(),
&commonv1.Metadata{
- Name: "sw",
+ It("should close the measure", func() {
+
svcs.repo.EXPECT().Publish(event.MeasureTopicEntityEvent,
test.NewEntityEventMatcher(databasev1.Action_ACTION_DELETE)).Times(1)
+ deleted, err :=
svcs.metadataService.MeasureRegistry().DeleteMeasure(context.TODO(),
&commonv1.Metadata{
+ Name: "cpm",
Group: "default",
})
Expect(err).ShouldNot(HaveOccurred())
Expect(deleted).Should(BeTrue())
Eventually(func() bool {
- _, ok :=
svcs.stream.schemaRepo.loadStream(&commonv1.Metadata{
- Name: "sw",
+ _, err :=
svcs.measure.Measure(&commonv1.Metadata{
+ Name: "cpm",
Group: "default",
})
- return ok
+ return err != nil
}).WithTimeout(10 * time.Second).Should(BeFalse())
})
- Context("Update a stream", func() {
- var streamSchema *databasev1.Stream
+ Context("Update a measure", func() {
+ var measureSchema *databasev1.Measure
BeforeEach(func() {
var err error
- streamSchema, err =
svcs.metadataService.StreamRegistry().GetStream(context.TODO(),
&commonv1.Metadata{
- Name: "sw",
+ measureSchema, err =
svcs.metadataService.MeasureRegistry().GetMeasure(context.TODO(),
&commonv1.Metadata{
+ Name: "cpm",
Group: "default",
})
Expect(err).ShouldNot(HaveOccurred())
- Expect(streamSchema).ShouldNot(BeNil())
+ Expect(measureSchema).ShouldNot(BeNil())
})
- It("should update a new stream", func() {
-
svcs.repo.EXPECT().Publish(event.StreamTopicEntityEvent,
&entityEventMatcher{action: databasev1.Action_ACTION_PUT}).Times(1)
+ It("should update a new measure", func() {
+
svcs.repo.EXPECT().Publish(event.MeasureTopicEntityEvent,
test.NewEntityEventMatcher(databasev1.Action_ACTION_PUT)).Times(1)
// Remove the first tag from the entity
- streamSchema.Entity.TagNames =
streamSchema.Entity.TagNames[1:]
- entitySize := len(streamSchema.Entity.TagNames)
+ measureSchema.Entity.TagNames =
measureSchema.Entity.TagNames[1:]
+ entitySize := len(measureSchema.Entity.TagNames)
-
Expect(svcs.metadataService.StreamRegistry().UpdateStream(context.TODO(),
streamSchema)).Should(Succeed())
+
Expect(svcs.metadataService.MeasureRegistry().UpdateMeasure(context.TODO(),
measureSchema)).Should(Succeed())
Eventually(func() bool {
- val, ok :=
svcs.stream.schemaRepo.loadStream(&commonv1.Metadata{
- Name: "sw",
+ val, err :=
svcs.measure.Measure(&commonv1.Metadata{
+ Name: "cpm",
Group: "default",
})
- if !ok {
+ if err != nil {
return false
}
- return
len(val.schema.GetEntity().TagNames) == entitySize
+ return
len(val.GetSchema().GetEntity().TagNames) == entitySize
}).WithTimeout(10 *
time.Second).Should(BeTrue())
})
})
diff --git a/banyand/measure/service.go b/banyand/measure/service.go
new file mode 100644
index 0000000..0181409
--- /dev/null
+++ b/banyand/measure/service.go
@@ -0,0 +1,156 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+ "context"
+ "time"
+
+ "github.com/pkg/errors"
+
+ "github.com/apache/skywalking-banyandb/api/data"
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ "github.com/apache/skywalking-banyandb/banyand/discovery"
+ "github.com/apache/skywalking-banyandb/banyand/metadata"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/run"
+ resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
+)
+
+var (
+ ErrEmptyRootPath = errors.New("root path is empty")
+ ErrMeasureNotExist = errors.New("measure doesn't exist")
+)
+
+type Service interface {
+ run.PreRunner
+ run.Config
+ run.Service
+ Query
+}
+
+var _ Service = (*service)(nil)
+
+type service struct {
+ schemaRepo schemaRepo
+ writeListener *writeCallback
+ l *logger.Logger
+ metadata metadata.Repo
+ root string
+ pipeline queue.Queue
+ repo discovery.ServiceRepo
+ // stop channel for the service
+ stopCh chan struct{}
+}
+
+func (s *service) Measure(metadata *commonv1.Metadata) (Measure, error) {
+ sm, ok := s.schemaRepo.loadMeasure(metadata)
+ if !ok {
+ return nil, errors.WithStack(ErrMeasureNotExist)
+ }
+ return sm, nil
+}
+func (s *service) LoadGroup(name string) (resourceSchema.Group, bool) {
+ return s.schemaRepo.LoadGroup(name)
+}
+
+func (s *service) FlagSet() *run.FlagSet {
+ flagS := run.NewFlagSet("storage")
+ flagS.StringVar(&s.root, "root-path", "/tmp", "the root path of
database")
+ return flagS
+}
+
+func (s *service) Validate() error {
+ if s.root == "" {
+ return ErrEmptyRootPath
+ }
+ return nil
+}
+
+func (s *service) Name() string {
+ return "measure"
+}
+
+func (s *service) PreRun() error {
+ s.l = logger.GetLogger(s.Name())
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ groups, err := s.metadata.GroupRegistry().ListGroup(ctx)
+ cancel()
+ if err != nil {
+ return err
+ }
+ s.schemaRepo = newSchemaRepo(s.root, s.metadata, s.repo, s.l)
+ for _, g := range groups {
+ if g.Catalog != commonv1.Catalog_CATALOG_MEASURE {
+ continue
+ }
+ gp, err := s.schemaRepo.StoreGroup(g.Metadata)
+ if err != nil {
+ return err
+ }
+ ctx, cancel = context.WithTimeout(context.Background(),
5*time.Second)
+ schemas, err := s.metadata.MeasureRegistry().ListMeasure(ctx,
schema.ListOpt{Group: gp.GetSchema().GetMetadata().Name})
+ cancel()
+ if err != nil {
+ return err
+ }
+ for _, sa := range schemas {
+ if _, innerErr := gp.StoreResource(sa); innerErr != nil
{
+ return innerErr
+ }
+ }
+ }
+
+ s.writeListener = setUpWriteCallback(s.l, &s.schemaRepo)
+
+ errWrite := s.pipeline.Subscribe(data.TopicMeasureWrite,
s.writeListener)
+ if errWrite != nil {
+ return errWrite
+ }
+ return nil
+}
+
+func (s *service) Serve() run.StopNotify {
+ _ = s.schemaRepo.NotifyAll()
+ // run a serial watcher
+ go s.schemaRepo.Watcher()
+
+
s.metadata.MeasureRegistry().RegisterHandler(schema.KindGroup|schema.KindMeasure|schema.KindIndexRuleBinding|schema.KindIndexRule,
+ &s.schemaRepo)
+
+ s.stopCh = make(chan struct{})
+ return s.stopCh
+}
+
+func (s *service) GracefulStop() {
+ s.schemaRepo.Close()
+ if s.stopCh != nil {
+ close(s.stopCh)
+ }
+}
+
+// NewService returns a new service
+func NewService(_ context.Context, metadata metadata.Repo, repo
discovery.ServiceRepo, pipeline queue.Queue) (Service, error) {
+ return &service{
+ metadata: metadata,
+ repo: repo,
+ pipeline: pipeline,
+ }, nil
+}
diff --git a/banyand/measure/testdata/query_data.json
b/banyand/measure/testdata/query_data.json
new file mode 100644
index 0000000..4eb3e7e
--- /dev/null
+++ b/banyand/measure/testdata/query_data.json
@@ -0,0 +1,107 @@
+[
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "1"
+ }
+ },
+ {
+ "str": {
+ "value": "minute"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 100
+ }
+ },
+ {
+ "int": {
+ "value": 100
+ }
+ },
+ {
+ "int": {
+ "value": 1
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "1"
+ }
+ },
+ {
+ "str": {
+ "value": "minute"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 200
+ }
+ },
+ {
+ "int": {
+ "value": 50
+ }
+ },
+ {
+ "int": {
+ "value": 4
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "1"
+ }
+ },
+ {
+ "str": {
+ "value": "minute"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 150
+ }
+ },
+ {
+ "int": {
+ "value": 300
+ }
+ },
+ {
+ "int": {
+ "value": 5
+ }
+ }
+ ]
+ }
+]
\ No newline at end of file
diff --git a/banyand/measure/testdata/write_data.json
b/banyand/measure/testdata/write_data.json
new file mode 100644
index 0000000..fbfe02f
--- /dev/null
+++ b/banyand/measure/testdata/write_data.json
@@ -0,0 +1,107 @@
+[
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "1"
+ }
+ },
+ {
+ "str": {
+ "value": "minute"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 100
+ }
+ },
+ {
+ "int": {
+ "value": 100
+ }
+ },
+ {
+ "int": {
+ "value": 1
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "1"
+ }
+ },
+ {
+ "str": {
+ "value": "minute"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 100
+ }
+ },
+ {
+ "int": {
+ "value": 100
+ }
+ },
+ {
+ "int": {
+ "value": 1
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "1"
+ }
+ },
+ {
+ "str": {
+ "value": "minute"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 100
+ }
+ },
+ {
+ "int": {
+ "value": 100
+ }
+ },
+ {
+ "int": {
+ "value": 1
+ }
+ }
+ ]
+ }
+]
\ No newline at end of file
diff --git a/banyand/stream/metadata_test.go b/banyand/stream/metadata_test.go
index 216bbd4..a980b2c 100644
--- a/banyand/stream/metadata_test.go
+++ b/banyand/stream/metadata_test.go
@@ -27,6 +27,7 @@ import (
"github.com/apache/skywalking-banyandb/api/event"
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/pkg/test"
)
var _ = Describe("Metadata", func() {
@@ -49,7 +50,7 @@ var _ = Describe("Metadata", func() {
}).WithTimeout(10 * time.Second).Should(BeTrue())
})
It("should close the group", func() {
- svcs.repo.EXPECT().Publish(event.StreamTopicShardEvent,
&shardEventMatcher{action: databasev1.Action_ACTION_DELETE}).Times(2)
+ svcs.repo.EXPECT().Publish(event.StreamTopicShardEvent,
test.NewShardEventMatcher(databasev1.Action_ACTION_DELETE)).Times(2)
deleted, err :=
svcs.metadataService.GroupRegistry().DeleteGroup(context.TODO(), "default")
Expect(err).ShouldNot(HaveOccurred())
Expect(deleted).Should(BeTrue())
@@ -60,8 +61,8 @@ var _ = Describe("Metadata", func() {
})
It("should add shards", func() {
- svcs.repo.EXPECT().Publish(event.StreamTopicShardEvent,
&shardEventMatcher{action: databasev1.Action_ACTION_DELETE}).Times(2)
- svcs.repo.EXPECT().Publish(event.StreamTopicShardEvent,
&shardEventMatcher{action: databasev1.Action_ACTION_PUT}).Times(4)
+ svcs.repo.EXPECT().Publish(event.StreamTopicShardEvent,
test.NewShardEventMatcher(databasev1.Action_ACTION_DELETE)).Times(2)
+ svcs.repo.EXPECT().Publish(event.StreamTopicShardEvent,
test.NewShardEventMatcher(databasev1.Action_ACTION_PUT)).Times(4)
groupSchema, err :=
svcs.metadataService.GroupRegistry().GetGroup(context.TODO(), "default")
Expect(err).ShouldNot(HaveOccurred())
Expect(groupSchema).ShouldNot(BeNil())
@@ -90,7 +91,7 @@ var _ = Describe("Metadata", func() {
}).WithTimeout(10 * time.Second).Should(BeTrue())
})
It("should close the stream", func() {
-
svcs.repo.EXPECT().Publish(event.StreamTopicEntityEvent,
&entityEventMatcher{action: databasev1.Action_ACTION_DELETE}).Times(1)
+
svcs.repo.EXPECT().Publish(event.StreamTopicEntityEvent,
test.NewEntityEventMatcher(databasev1.Action_ACTION_DELETE)).Times(1)
deleted, err :=
svcs.metadataService.StreamRegistry().DeleteStream(context.TODO(),
&commonv1.Metadata{
Name: "sw",
Group: "default",
@@ -121,7 +122,7 @@ var _ = Describe("Metadata", func() {
})
It("should update a new stream", func() {
-
svcs.repo.EXPECT().Publish(event.StreamTopicEntityEvent,
&entityEventMatcher{action: databasev1.Action_ACTION_PUT}).Times(1)
+
svcs.repo.EXPECT().Publish(event.StreamTopicEntityEvent,
test.NewEntityEventMatcher(databasev1.Action_ACTION_PUT)).Times(1)
// Remove the first tag from the entity
streamSchema.Entity.TagNames =
streamSchema.Entity.TagNames[1:]
entitySize := len(streamSchema.Entity.TagNames)
diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go
index 2856938..83854ac 100644
--- a/banyand/stream/stream.go
+++ b/banyand/stream/stream.go
@@ -26,6 +26,7 @@ import (
"github.com/apache/skywalking-banyandb/banyand/tsdb/index"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/partition"
+ pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/schema"
)
@@ -69,20 +70,10 @@ func (s *stream) Close() error {
return s.indexWriter.Close()
}
-func parseMaxModRevision(indexRules []*databasev1.IndexRule)
(maxRevisionForIdxRules int64) {
- maxRevisionForIdxRules = int64(0)
- for _, idxRule := range indexRules {
- if idxRule.GetMetadata().GetModRevision() >
maxRevisionForIdxRules {
- maxRevisionForIdxRules =
idxRule.GetMetadata().GetModRevision()
- }
- }
- return
-}
-
func (s *stream) parseSpec() {
s.name, s.group = s.schema.GetMetadata().GetName(),
s.schema.GetMetadata().GetGroup()
s.entityLocator = partition.NewEntityLocator(s.schema.GetTagFamilies(),
s.schema.GetEntity())
- s.maxObservedModRevision = parseMaxModRevision(s.indexRules)
+ s.maxObservedModRevision = pbv1.ParseMaxModRevision(s.indexRules)
}
type streamSpec struct {
diff --git a/banyand/stream/stream_query.go b/banyand/stream/stream_query.go
index 2896f0d..493966f 100644
--- a/banyand/stream/stream_query.go
+++ b/banyand/stream/stream_query.go
@@ -55,7 +55,7 @@ func (s *stream) Shards(entity tsdb.Entity) ([]tsdb.Shard,
error) {
wrap := func(shards []tsdb.Shard) []tsdb.Shard {
result := make([]tsdb.Shard, len(shards))
for i := 0; i < len(shards); i++ {
- result[i] = newShardDelegate(tsdb.Entry(s.name),
shards[i])
+ result[i] = tsdb.NewScopedShard(tsdb.Entry(s.name),
shards[i])
}
return result
}
@@ -76,7 +76,7 @@ func (s *stream) Shards(entity tsdb.Entity) ([]tsdb.Shard,
error) {
if err != nil {
return nil, err
}
- return []tsdb.Shard{newShardDelegate(tsdb.Entry(s.name), shard)}, nil
+ return []tsdb.Shard{tsdb.NewScopedShard(tsdb.Entry(s.name), shard)}, nil
}
func (s *stream) Shard(id common.ShardID) (tsdb.Shard, error) {
@@ -84,7 +84,7 @@ func (s *stream) Shard(id common.ShardID) (tsdb.Shard, error)
{
if err != nil {
return nil, err
}
- return newShardDelegate(tsdb.Entry(s.name), shard), nil
+ return tsdb.NewScopedShard(tsdb.Entry(s.name), shard), nil
}
func (s *stream) ParseTagFamily(family string, item tsdb.Item)
(*modelv1.TagFamily, error) {
@@ -128,68 +128,3 @@ func (s *stream) ParseElementID(item tsdb.Item) (string,
error) {
}
return string(rawBytes), nil
}
-
-var _ tsdb.Shard = (*shardDelegate)(nil)
-
-type shardDelegate struct {
- scope tsdb.Entry
- delegated tsdb.Shard
-}
-
-func newShardDelegate(scope tsdb.Entry, delegated tsdb.Shard) tsdb.Shard {
- return &shardDelegate{
- scope: scope,
- delegated: delegated,
- }
-}
-
-func (sd *shardDelegate) Close() error {
- // the delegate can't close the underlying shard
- return nil
-}
-
-func (sd *shardDelegate) ID() common.ShardID {
- return sd.delegated.ID()
-}
-
-func (sd *shardDelegate) Series() tsdb.SeriesDatabase {
- return &seriesDatabaseDelegate{
- scope: sd.scope,
- delegated: sd.delegated.Series(),
- }
-}
-
-func (sd *shardDelegate) Index() tsdb.IndexDatabase {
- return sd.delegated.Index()
-}
-
-func (sd *shardDelegate) State() tsdb.ShardState {
- return sd.delegated.State()
-}
-
-var _ tsdb.SeriesDatabase = (*seriesDatabaseDelegate)(nil)
-
-type seriesDatabaseDelegate struct {
- scope tsdb.Entry
- delegated tsdb.SeriesDatabase
-}
-
-func (sdd *seriesDatabaseDelegate) Close() error {
- return nil
-}
-
-func (sdd *seriesDatabaseDelegate) GetByHashKey(key []byte) (tsdb.Series,
error) {
- return sdd.delegated.GetByHashKey(key)
-}
-
-func (sdd *seriesDatabaseDelegate) GetByID(id common.SeriesID) (tsdb.Series,
error) {
- return sdd.delegated.GetByID(id)
-}
-
-func (sdd *seriesDatabaseDelegate) Get(entity tsdb.Entity) (tsdb.Series,
error) {
- return sdd.delegated.Get(entity.Prepend(sdd.scope))
-}
-
-func (sdd *seriesDatabaseDelegate) List(path tsdb.Path) (tsdb.SeriesList,
error) {
- return sdd.delegated.List(path.Prepand(sdd.scope))
-}
diff --git a/banyand/stream/stream_query_test.go
b/banyand/stream/stream_query_test.go
index cca1637..cae08e6 100644
--- a/banyand/stream/stream_query_test.go
+++ b/banyand/stream/stream_query_test.go
@@ -54,26 +54,27 @@ type shardStruct struct {
type shardsForTest []shardStruct
var _ = Describe("Write", func() {
- var (
- s *stream
- deferFn func()
- )
- BeforeEach(func() {
- var svcs *services
- svcs, deferFn = setUp()
- var ok bool
- s, ok = svcs.stream.schemaRepo.loadStream(&commonv1.Metadata{
- Name: "sw",
- Group: "default",
+ Context("Select shard", func() {
+ var (
+ s *stream
+ deferFn func()
+ )
+
+ BeforeEach(func() {
+ var svcs *services
+ svcs, deferFn = setUp()
+ var ok bool
+ s, ok =
svcs.stream.schemaRepo.loadStream(&commonv1.Metadata{
+ Name: "sw",
+ Group: "default",
+ })
+ Expect(ok).To(BeTrue())
})
- Expect(ok).To(BeTrue())
- })
- AfterEach(func() {
- deferFn()
- })
- Context("Select shard", func() {
+ AfterEach(func() {
+ deferFn()
+ })
tests := []struct {
name string
entity tsdb.Entity
@@ -107,11 +108,26 @@ var _ = Describe("Write", func() {
})
}
})
- Context("Querying by local indices", func() {
- var now time.Time
- BeforeEach(func() {
+ Context("Querying by local indices", Ordered, func() {
+ var (
+ s *stream
+ now time.Time
+ deferFn func()
+ )
+ BeforeAll(func() {
+ var svcs *services
+ svcs, deferFn = setUp()
+ var ok bool
+ s, ok =
svcs.stream.schemaRepo.loadStream(&commonv1.Metadata{
+ Name: "sw",
+ Group: "default",
+ })
+ Expect(ok).To(BeTrue())
now = setupQueryData("multiple_shards.json", s)
})
+ AfterAll(func() {
+ deferFn()
+ })
When("", func() {
l1 := []string{fmt.Sprintf("series_%d",
tsdb.SeriesID(tsdb.Entity{
tsdb.Entry("sw"),
@@ -581,10 +597,25 @@ var _ = Describe("Write", func() {
})
})
- Context("Querying by global indices", func() {
- BeforeEach(func() {
+ Context("Querying by global indices", Ordered, func() {
+ var (
+ s *stream
+ deferFn func()
+ )
+ BeforeAll(func() {
+ var svcs *services
+ svcs, deferFn = setUp()
+ var ok bool
+ s, ok =
svcs.stream.schemaRepo.loadStream(&commonv1.Metadata{
+ Name: "sw",
+ Group: "default",
+ })
+ Expect(ok).To(BeTrue())
_ = setupQueryData("global_index.json", s)
})
+ AfterAll(func() {
+ deferFn()
+ })
DescribeTable("", func(traceID string, wantTraceSegmentNum int,
wantErr bool) {
shards, errShards := s.Shards(nil)
Expect(errShards).ShouldNot(HaveOccurred())
diff --git a/banyand/stream/stream_suite_test.go
b/banyand/stream/stream_suite_test.go
index 53b7618..b3d3f60 100644
--- a/banyand/stream/stream_suite_test.go
+++ b/banyand/stream/stream_suite_test.go
@@ -19,7 +19,6 @@ package stream
import (
"context"
- "fmt"
"testing"
"github.com/golang/mock/gomock"
@@ -31,7 +30,6 @@ import (
"github.com/apache/skywalking-banyandb/banyand/discovery"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/queue"
- "github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/test"
teststream "github.com/apache/skywalking-banyandb/pkg/test/stream"
@@ -50,47 +48,6 @@ var _ = BeforeSuite(func() {
})).To(Succeed())
})
-var (
- _ gomock.Matcher = (*shardEventMatcher)(nil)
- _ gomock.Matcher = (*entityEventMatcher)(nil)
-)
-
-type shardEventMatcher struct {
- action databasev1.Action
-}
-
-func (s *shardEventMatcher) Matches(x interface{}) bool {
- if m, messageOk := x.(bus.Message); messageOk {
- if evt, dataOk := m.Data().(*databasev1.ShardEvent); dataOk {
- return evt.Action == s.action
- }
- }
-
- return false
-}
-
-func (s *shardEventMatcher) String() string {
- return fmt.Sprintf("shard-event-matcher(%s)",
databasev1.Action_name[int32(s.action)])
-}
-
-type entityEventMatcher struct {
- action databasev1.Action
-}
-
-func (s *entityEventMatcher) Matches(x interface{}) bool {
- if m, messageOk := x.(bus.Message); messageOk {
- if evt, dataOk := m.Data().(*databasev1.EntityEvent); dataOk {
- return evt.Action == s.action
- }
- }
-
- return false
-}
-
-func (s *entityEventMatcher) String() string {
- return fmt.Sprintf("entity-event-matcher(%s)",
databasev1.Action_name[int32(s.action)])
-}
-
// service to preload stream
type preloadStreamService struct {
metaSvc metadata.Service
@@ -117,8 +74,8 @@ func setUp() (*services, func()) {
repo := discovery.NewMockServiceRepo(ctrl)
repo.EXPECT().NodeID().AnyTimes()
// Both PreRun and Serve phases send events
- repo.EXPECT().Publish(event.StreamTopicEntityEvent,
&entityEventMatcher{action: databasev1.Action_ACTION_PUT}).Times(2 * 1)
- repo.EXPECT().Publish(event.StreamTopicShardEvent,
&shardEventMatcher{action: databasev1.Action_ACTION_PUT}).Times(2 * 2)
+ repo.EXPECT().Publish(event.StreamTopicEntityEvent,
test.NewEntityEventMatcher(databasev1.Action_ACTION_PUT)).Times(2 * 1)
+ repo.EXPECT().Publish(event.StreamTopicShardEvent,
test.NewShardEventMatcher(databasev1.Action_ACTION_PUT)).Times(2 * 2)
// Init Pipeline
pipeline, err := queue.NewQueue(context.TODO(), repo)
diff --git a/banyand/tsdb/scope.go b/banyand/tsdb/scope.go
new file mode 100644
index 0000000..cbbe687
--- /dev/null
+++ b/banyand/tsdb/scope.go
@@ -0,0 +1,85 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package tsdb
+
+import "github.com/apache/skywalking-banyandb/api/common"
+
+var _ Shard = (*ScopedShard)(nil)
+
+type ScopedShard struct {
+ scope Entry
+ delegated Shard
+}
+
+func NewScopedShard(scope Entry, delegated Shard) Shard {
+ return &ScopedShard{
+ scope: scope,
+ delegated: delegated,
+ }
+}
+
+func (sd *ScopedShard) Close() error {
+ // the delegate can't close the underlying shard
+ return nil
+}
+
+func (sd *ScopedShard) ID() common.ShardID {
+ return sd.delegated.ID()
+}
+
+func (sd *ScopedShard) Series() SeriesDatabase {
+ return &scopedSeriesDatabase{
+ scope: sd.scope,
+ delegated: sd.delegated.Series(),
+ }
+}
+
+func (sd *ScopedShard) Index() IndexDatabase {
+ return sd.delegated.Index()
+}
+
+func (sd *ScopedShard) State() ShardState {
+ return sd.delegated.State()
+}
+
+var _ SeriesDatabase = (*scopedSeriesDatabase)(nil)
+
+type scopedSeriesDatabase struct {
+ scope Entry
+ delegated SeriesDatabase
+}
+
+func (sdd *scopedSeriesDatabase) Close() error {
+ return nil
+}
+
+func (sdd *scopedSeriesDatabase) GetByHashKey(key []byte) (Series, error) {
+ return sdd.delegated.GetByHashKey(key)
+}
+
+func (sdd *scopedSeriesDatabase) GetByID(id common.SeriesID) (Series, error) {
+ return sdd.delegated.GetByID(id)
+}
+
+func (sdd *scopedSeriesDatabase) Get(entity Entity) (Series, error) {
+ return sdd.delegated.Get(entity.Prepend(sdd.scope))
+}
+
+func (sdd *scopedSeriesDatabase) List(path Path) (SeriesList, error) {
+ return sdd.delegated.List(path.Prepand(sdd.scope))
+}
diff --git a/pkg/test/matcher.go b/pkg/test/matcher.go
new file mode 100644
index 0000000..f218bf0
--- /dev/null
+++ b/pkg/test/matcher.go
@@ -0,0 +1,80 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package test
+
+import (
+ "fmt"
+
+ "github.com/golang/mock/gomock"
+
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/pkg/bus"
+)
+
+var (
+ _ gomock.Matcher = (*shardEventMatcher)(nil)
+ _ gomock.Matcher = (*entityEventMatcher)(nil)
+)
+
+type shardEventMatcher struct {
+ action databasev1.Action
+}
+
+func NewShardEventMatcher(action databasev1.Action) gomock.Matcher {
+ return &shardEventMatcher{
+ action: action,
+ }
+}
+
+func (s *shardEventMatcher) Matches(x interface{}) bool {
+ if m, messageOk := x.(bus.Message); messageOk {
+ if evt, dataOk := m.Data().(*databasev1.ShardEvent); dataOk {
+ return evt.Action == s.action
+ }
+ }
+
+ return false
+}
+
+func (s *shardEventMatcher) String() string {
+ return fmt.Sprintf("shard-event-matcher(%s)",
databasev1.Action_name[int32(s.action)])
+}
+
+type entityEventMatcher struct {
+ action databasev1.Action
+}
+
+func NewEntityEventMatcher(action databasev1.Action) gomock.Matcher {
+ return &entityEventMatcher{
+ action: action,
+ }
+}
+
+func (s *entityEventMatcher) Matches(x interface{}) bool {
+ if m, messageOk := x.(bus.Message); messageOk {
+ if evt, dataOk := m.Data().(*databasev1.EntityEvent); dataOk {
+ return evt.Action == s.action
+ }
+ }
+
+ return false
+}
+
+func (s *entityEventMatcher) String() string {
+ return fmt.Sprintf("entity-event-matcher(%s)",
databasev1.Action_name[int32(s.action)])
+}
diff --git a/pkg/test/measure/etcd.go b/pkg/test/measure/etcd.go
index c08968e..9f20108 100644
--- a/pkg/test/measure/etcd.go
+++ b/pkg/test/measure/etcd.go
@@ -21,13 +21,13 @@ import (
"context"
"embed"
"fmt"
- "math/rand"
"os"
"path"
"github.com/google/uuid"
"google.golang.org/protobuf/encoding/protojson"
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
)
@@ -41,9 +41,18 @@ var (
indexRuleBindingJSON string
//go:embed testdata/measure.json
measureJSON string
+ //go:embed testdata/group.json
+ groupJSON string
)
func PreloadSchema(e schema.Registry) error {
+ g := &commonv1.Group{}
+ if err := protojson.Unmarshal([]byte(groupJSON), g); err != nil {
+ return err
+ }
+ if err := e.UpdateGroup(context.TODO(), g); err != nil {
+ return err
+ }
s := &databasev1.Measure{}
if err := protojson.Unmarshal([]byte(measureJSON), s); err != nil {
return err
@@ -88,9 +97,3 @@ func PreloadSchema(e schema.Registry) error {
func RandomTempDir() string {
return path.Join(os.TempDir(), fmt.Sprintf("banyandb-embed-etcd-%s",
uuid.New().String()))
}
-
-func RandomUnixDomainListener() (string, string) {
- i := rand.Uint64()
- return fmt.Sprintf("%s://localhost:%d%06d", "unix", os.Getpid(), i),
- fmt.Sprintf("%s://localhost:%d%06d", "unix", os.Getpid(), i+1)
-}
diff --git a/pkg/test/measure/testdata/group.json
b/pkg/test/measure/testdata/group.json
new file mode 100644
index 0000000..c2b6c02
--- /dev/null
+++ b/pkg/test/measure/testdata/group.json
@@ -0,0 +1,27 @@
+{
+ "metadata": {
+ "name": "default"
+ },
+ "catalog": "CATALOG_MEASURE",
+ "resource_opts": {
+ "shard_num": 2,
+ "interval_rules": [
+ {
+ "tag_name": "scope",
+ "str": "minute",
+ "interval": "1m"
+ },
+ {
+ "tag_name": "scope",
+ "str": "hour",
+ "interval": "1h"
+ },
+ {
+ "tag_name": "scope",
+ "str": "day",
+ "interval": "1d"
+ }
+ ]
+ },
+ "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/pkg/test/measure/testdata/measure.json
b/pkg/test/measure/testdata/measure.json
index a0ab240..47e341a 100644
--- a/pkg/test/measure/testdata/measure.json
+++ b/pkg/test/measure/testdata/measure.json
@@ -43,29 +43,5 @@
"entity_id"
]
},
- "interval_rules": [
- {
- "tag_name": "scope",
- "str": "minute",
- "interval": "1m"
- },
- {
- "tag_name": "scope",
- "str": "hour",
- "interval": "1h"
- },
- {
- "tag_name": "scope",
- "str": "day",
- "interval": "1d"
- }
- ],
- "opts": {
- "shard_num": 2,
- "ttl": {
- "val": 7,
- "unit": "DURATION_UNIT_DAY"
- }
- },
"updated_at": "2021-04-15T01:30:15.01Z"
}
\ No newline at end of file