This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch stream-metadata in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 0288a60e61b05ab556063f057463aa2e7424a3d0 Author: Gao Hongtao <[email protected]> AuthorDate: Thu Feb 24 11:53:57 2022 +0000 Add measure module Signed-off-by: Gao Hongtao <[email protected]> --- banyand/{stream/stream.go => measure/measure.go} | 47 ++--- .../stream_query.go => measure/measure_query.go} | 117 +++-------- banyand/measure/measure_query_test.go | 104 ++++++++++ banyand/measure/measure_suite_test.go | 120 +++++++++++ banyand/measure/measure_write.go | 227 +++++++++++++++++++++ banyand/measure/measure_write_test.go | 79 +++++++ banyand/measure/metadata.go | 207 +++++++++++++++++++ banyand/{stream => measure}/metadata_test.go | 65 +++--- banyand/measure/service.go | 156 ++++++++++++++ banyand/measure/testdata/query_data.json | 107 ++++++++++ banyand/measure/testdata/write_data.json | 107 ++++++++++ banyand/stream/metadata_test.go | 11 +- banyand/stream/stream.go | 13 +- banyand/stream/stream_query.go | 71 +------ banyand/stream/stream_query_test.go | 75 +++++-- banyand/stream/stream_suite_test.go | 47 +---- banyand/tsdb/scope.go | 85 ++++++++ pkg/test/matcher.go | 80 ++++++++ pkg/test/measure/etcd.go | 17 +- pkg/test/measure/testdata/group.json | 27 +++ pkg/test/measure/testdata/measure.json | 24 --- 21 files changed, 1460 insertions(+), 326 deletions(-) diff --git a/banyand/stream/stream.go b/banyand/measure/measure.go similarity index 69% copy from banyand/stream/stream.go copy to banyand/measure/measure.go index 2856938..481197d 100644 --- a/banyand/stream/stream.go +++ b/banyand/measure/measure.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package stream +package measure import ( "context" @@ -26,21 +26,18 @@ import ( "github.com/apache/skywalking-banyandb/banyand/tsdb/index" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/partition" - "github.com/apache/skywalking-banyandb/pkg/schema" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" ) // a chunk is 1MB const chunkSize = 1 << 20 -var _ schema.Resource = (*stream)(nil) - -type stream struct { +type measure struct { name string group string shardNum uint32 l *logger.Logger - // schema is the reference to the spec of the stream - schema *databasev1.Stream + schema *databasev1.Measure // maxObservedModRevision is the max observed revision of index rules in the spec maxObservedModRevision int64 db tsdb.Supplier @@ -49,49 +46,43 @@ type stream struct { indexWriter *index.Writer } -func (s *stream) GetMetadata() *commonv1.Metadata { +func (s *measure) GetSchema() *databasev1.Measure { + return s.schema +} + +func (s *measure) GetMetadata() *commonv1.Metadata { return s.schema.Metadata } -func (s *stream) GetIndexRules() []*databasev1.IndexRule { +func (s *measure) GetIndexRules() []*databasev1.IndexRule { return s.indexRules } -func (s *stream) MaxObservedModRevision() int64 { +func (s *measure) MaxObservedModRevision() int64 { return s.maxObservedModRevision } -func (s *stream) EntityLocator() partition.EntityLocator { +func (s *measure) EntityLocator() partition.EntityLocator { return s.entityLocator } -func (s *stream) Close() error { +func (s *measure) Close() error { return s.indexWriter.Close() } -func parseMaxModRevision(indexRules []*databasev1.IndexRule) (maxRevisionForIdxRules int64) { - maxRevisionForIdxRules = int64(0) - for _, idxRule := range indexRules { - if idxRule.GetMetadata().GetModRevision() > maxRevisionForIdxRules { - maxRevisionForIdxRules = idxRule.GetMetadata().GetModRevision() - } - } - return -} - -func (s *stream) parseSpec() { +func (s *measure) parseSpec() { s.name, s.group = s.schema.GetMetadata().GetName(), s.schema.GetMetadata().GetGroup() s.entityLocator = partition.NewEntityLocator(s.schema.GetTagFamilies(), s.schema.GetEntity()) - s.maxObservedModRevision = parseMaxModRevision(s.indexRules) + s.maxObservedModRevision = pbv1.ParseMaxModRevision(s.indexRules) } -type streamSpec struct { - schema *databasev1.Stream +type measureSpec struct { + schema *databasev1.Measure indexRules []*databasev1.IndexRule } -func openStream(shardNum uint32, db tsdb.Supplier, spec streamSpec, l *logger.Logger) (*stream, error) { - sm := &stream{ +func openMeasure(shardNum uint32, db tsdb.Supplier, spec measureSpec, l *logger.Logger) (*measure, error) { + sm := &measure{ shardNum: shardNum, schema: spec.schema, indexRules: spec.indexRules, diff --git a/banyand/stream/stream_query.go b/banyand/measure/measure_query.go similarity index 53% copy from banyand/stream/stream_query.go copy to banyand/measure/measure_query.go index 2896f0d..4d482b2 100644 --- a/banyand/stream/stream_query.go +++ b/banyand/measure/measure_query.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package stream +package measure import ( "io" @@ -26,10 +26,11 @@ import ( "github.com/apache/skywalking-banyandb/api/common" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" - streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" "github.com/apache/skywalking-banyandb/banyand/tsdb" "github.com/apache/skywalking-banyandb/pkg/partition" + resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema" ) var ( @@ -37,25 +38,27 @@ var ( ) type Query interface { - Stream(stream *commonv1.Metadata) (Stream, error) + LoadGroup(name string) (resourceSchema.Group, bool) + Measure(measure *commonv1.Metadata) (Measure, error) } -type Stream interface { +type Measure interface { io.Closer - Write(value *streamv1.ElementValue) error + Write(value *measurev1.DataPointValue) error Shards(entity tsdb.Entity) ([]tsdb.Shard, error) Shard(id common.ShardID) (tsdb.Shard, error) ParseTagFamily(family string, item tsdb.Item) (*modelv1.TagFamily, error) - ParseElementID(item tsdb.Item) (string, error) + ParseField(name string, item tsdb.Item) (*measurev1.DataPoint_Field, error) + GetSchema() *databasev1.Measure } -var _ Stream = (*stream)(nil) +var _ Measure = (*measure)(nil) -func (s *stream) Shards(entity tsdb.Entity) ([]tsdb.Shard, error) { +func (s *measure) Shards(entity tsdb.Entity) ([]tsdb.Shard, error) { wrap := func(shards []tsdb.Shard) []tsdb.Shard { result := make([]tsdb.Shard, len(shards)) for i := 0; i < len(shards); i++ { - result[i] = newShardDelegate(tsdb.Entry(s.name), shards[i]) + result[i] = tsdb.NewScopedShard(tsdb.Entry(s.name), shards[i]) } return result } @@ -76,21 +79,21 @@ func (s *stream) Shards(entity tsdb.Entity) ([]tsdb.Shard, error) { if err != nil { return nil, err } - return []tsdb.Shard{newShardDelegate(tsdb.Entry(s.name), shard)}, nil + return []tsdb.Shard{tsdb.NewScopedShard(tsdb.Entry(s.name), shard)}, nil } -func (s *stream) Shard(id common.ShardID) (tsdb.Shard, error) { +func (s *measure) Shard(id common.ShardID) (tsdb.Shard, error) { shard, err := s.db.SupplyTSDB().Shard(id) if err != nil { return nil, err } - return newShardDelegate(tsdb.Entry(s.name), shard), nil + return tsdb.NewScopedShard(tsdb.Entry(s.name), shard), nil } -func (s *stream) ParseTagFamily(family string, item tsdb.Item) (*modelv1.TagFamily, error) { - familyRawBytes, err := item.Family(family) +func (s *measure) ParseTagFamily(family string, item tsdb.Item) (*modelv1.TagFamily, error) { + familyRawBytes, err := item.Family(string(familyIdentity(family, TagFlag))) if err != nil { - return nil, errors.Wrapf(err, "parse family %s", family) + return nil, err } tagFamily := &modelv1.TagFamilyForWrite{} err = proto.Unmarshal(familyRawBytes, tagFamily) @@ -121,75 +124,21 @@ func (s *stream) ParseTagFamily(family string, item tsdb.Item) (*modelv1.TagFami }, err } -func (s *stream) ParseElementID(item tsdb.Item) (string, error) { - rawBytes, err := item.Val() - if err != nil { - return "", err - } - return string(rawBytes), nil -} - -var _ tsdb.Shard = (*shardDelegate)(nil) - -type shardDelegate struct { - scope tsdb.Entry - delegated tsdb.Shard -} - -func newShardDelegate(scope tsdb.Entry, delegated tsdb.Shard) tsdb.Shard { - return &shardDelegate{ - scope: scope, - delegated: delegated, +func (s *measure) ParseField(name string, item tsdb.Item) (*measurev1.DataPoint_Field, error) { + var fieldSpec *databasev1.FieldSpec + for _, spec := range s.schema.GetFields() { + if spec.GetName() == name { + fieldSpec = spec + break + } } -} - -func (sd *shardDelegate) Close() error { - // the delegate can't close the underlying shard - return nil -} - -func (sd *shardDelegate) ID() common.ShardID { - return sd.delegated.ID() -} - -func (sd *shardDelegate) Series() tsdb.SeriesDatabase { - return &seriesDatabaseDelegate{ - scope: sd.scope, - delegated: sd.delegated.Series(), + bytes, err := item.Family(string(familyIdentity(name, encoderFieldFlag(fieldSpec)))) + if err != nil { + return nil, err } -} - -func (sd *shardDelegate) Index() tsdb.IndexDatabase { - return sd.delegated.Index() -} - -func (sd *shardDelegate) State() tsdb.ShardState { - return sd.delegated.State() -} - -var _ tsdb.SeriesDatabase = (*seriesDatabaseDelegate)(nil) - -type seriesDatabaseDelegate struct { - scope tsdb.Entry - delegated tsdb.SeriesDatabase -} - -func (sdd *seriesDatabaseDelegate) Close() error { - return nil -} - -func (sdd *seriesDatabaseDelegate) GetByHashKey(key []byte) (tsdb.Series, error) { - return sdd.delegated.GetByHashKey(key) -} - -func (sdd *seriesDatabaseDelegate) GetByID(id common.SeriesID) (tsdb.Series, error) { - return sdd.delegated.GetByID(id) -} - -func (sdd *seriesDatabaseDelegate) Get(entity tsdb.Entity) (tsdb.Series, error) { - return sdd.delegated.Get(entity.Prepend(sdd.scope)) -} - -func (sdd *seriesDatabaseDelegate) List(path tsdb.Path) (tsdb.SeriesList, error) { - return sdd.delegated.List(path.Prepand(sdd.scope)) + fieldValue := decodeFieldValue(bytes, fieldSpec) + return &measurev1.DataPoint_Field{ + Name: name, + Value: fieldValue, + }, err } diff --git a/banyand/measure/measure_query_test.go b/banyand/measure/measure_query_test.go new file mode 100644 index 0000000..3a9828d --- /dev/null +++ b/banyand/measure/measure_query_test.go @@ -0,0 +1,104 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreementmeasure. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure_test + +import ( + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/banyand/measure" + "github.com/apache/skywalking-banyandb/banyand/tsdb" + "github.com/apache/skywalking-banyandb/pkg/timestamp" +) + +var _ = Describe("Write", func() { + var svcs *services + var deferFn func() + var measure measure.Measure + + BeforeEach(func() { + svcs, deferFn = setUp() + var err error + measure, err = svcs.measure.Measure(&commonv1.Metadata{ + Name: "cpm", + Group: "default", + }) + Expect(err).ShouldNot(HaveOccurred()) + }) + AfterEach(func() { + deferFn() + }) + It("queries data", func() { + baseTime := writeData("query_data.json", measure) + shard, err := measure.Shard(0) + Expect(err).ShouldNot(HaveOccurred()) + series, err := shard.Series().Get(tsdb.Entity{tsdb.Entry("1")}) + Expect(err).ShouldNot(HaveOccurred()) + seriesSpan, err := series.Span(timestamp.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour)) + defer func(seriesSpan tsdb.SeriesSpan) { + _ = seriesSpan.Close() + }(seriesSpan) + Expect(err).ShouldNot(HaveOccurred()) + seeker, err := seriesSpan.SeekerBuilder().OrderByTime(modelv1.Sort_SORT_DESC).Build() + Expect(err).ShouldNot(HaveOccurred()) + iter, err := seeker.Seek() + Expect(err).ShouldNot(HaveOccurred()) + Expect(len(iter)).To(Equal(1)) + defer func(iterator tsdb.Iterator) { + _ = iterator.Close() + }(iter[0]) + i := 0 + expectedFields := [][]int64{{150, 300, 5}, {200, 50, 4}, {100, 100, 1}} + for ; iter[0].Next(); i++ { + item := iter[0].Val() + tagFamily, err := measure.ParseTagFamily("default", item) + Expect(err).ShouldNot(HaveOccurred()) + Expect(len(tagFamily.Tags)).To(Equal(2)) + summation, err := measure.ParseField("summation", item) + Expect(err).ShouldNot(HaveOccurred()) + Expect(summation.GetValue().GetInt().Value).To(Equal(expectedFields[i][0])) + count, err := measure.ParseField("count", item) + Expect(err).ShouldNot(HaveOccurred()) + Expect(count.GetValue().GetInt().Value).To(Equal(expectedFields[i][1])) + value, err := measure.ParseField("value", item) + Expect(err).ShouldNot(HaveOccurred()) + Expect(value.GetValue().GetInt().Value).To(Equal(expectedFields[i][2])) + } + }) +}) diff --git a/banyand/measure/measure_suite_test.go b/banyand/measure/measure_suite_test.go new file mode 100644 index 0000000..c8c80fb --- /dev/null +++ b/banyand/measure/measure_suite_test.go @@ -0,0 +1,120 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +package measure_test + +import ( + "context" + "testing" + + "github.com/golang/mock/gomock" + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + + "github.com/apache/skywalking-banyandb/api/event" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + "github.com/apache/skywalking-banyandb/banyand/discovery" + "github.com/apache/skywalking-banyandb/banyand/measure" + "github.com/apache/skywalking-banyandb/banyand/metadata" + "github.com/apache/skywalking-banyandb/banyand/queue" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/test" + testmeasure "github.com/apache/skywalking-banyandb/pkg/test/measure" +) + +func TestMeasure(t *testing.T) { + gomega.RegisterFailHandler(ginkgo.Fail) + ginkgo.RunSpecs(t, "Measure Suite") +} + +// BeforeSuite - Init logger +var _ = ginkgo.BeforeSuite(func() { + gomega.Expect(logger.Init(logger.Logging{ + Env: "dev", + Level: "info", + })).To(gomega.Succeed()) +}) + +// service to preload measure +type preloadMeasureService struct { + metaSvc metadata.Service +} + +func (p *preloadMeasureService) Name() string { + return "preload-measure" +} + +func (p *preloadMeasureService) PreRun() error { + return testmeasure.PreloadSchema(p.metaSvc.SchemaRegistry()) +} + +type services struct { + measure measure.Service + metadataService metadata.Service + repo *discovery.MockServiceRepo + pipeline queue.Queue +} + +func setUp() (*services, func()) { + ctrl := gomock.NewController(ginkgo.GinkgoT()) + gomega.Expect(ctrl).ShouldNot(gomega.BeNil()) + // Init Discovery + repo := discovery.NewMockServiceRepo(ctrl) + repo.EXPECT().NodeID().AnyTimes() + // Both PreRun and Serve phases send events + repo.EXPECT().Publish(event.MeasureTopicEntityEvent, test.NewEntityEventMatcher(databasev1.Action_ACTION_PUT)).Times(2 * 1) + repo.EXPECT().Publish(event.MeasureTopicShardEvent, test.NewShardEventMatcher(databasev1.Action_ACTION_PUT)).Times(2 * 2) + + // Init Pipeline + pipeline, err := queue.NewQueue(context.TODO(), repo) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + // Init Metadata Service + metadataService, err := metadata.NewService(context.TODO()) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + // Init Measure Service + measureService, err := measure.NewService(context.TODO(), metadataService, repo, pipeline) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + preloadMeasureSvc := &preloadMeasureService{metaSvc: metadataService} + var flags []string + metaPath, metaDeferFunc, err := test.NewSpace() + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + flags = append(flags, "--metadata-root-path="+metaPath) + rootPath, deferFunc, err := test.NewSpace() + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + flags = append(flags, "--root-path="+rootPath) + moduleDeferFunc := test.SetUpModules( + flags, + repo, + pipeline, + metadataService, + preloadMeasureSvc, + measureService, + ) + return &services{ + measure: measureService, + metadataService: metadataService, + repo: repo, + pipeline: pipeline, + }, func() { + moduleDeferFunc() + metaDeferFunc() + deferFunc() + } +} diff --git a/banyand/measure/measure_write.go b/banyand/measure/measure_write.go new file mode 100644 index 0000000..5b777e7 --- /dev/null +++ b/banyand/measure/measure_write.go @@ -0,0 +1,227 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "bytes" + + "github.com/pkg/errors" + "google.golang.org/protobuf/proto" + + "github.com/apache/skywalking-banyandb/api/common" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/banyand/tsdb" + "github.com/apache/skywalking-banyandb/banyand/tsdb/index" + "github.com/apache/skywalking-banyandb/pkg/bus" + "github.com/apache/skywalking-banyandb/pkg/convert" + "github.com/apache/skywalking-banyandb/pkg/logger" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/timestamp" +) + +var ( + ErrMalformedElement = errors.New("element is malformed") +) + +const ( + TagFlag byte = iota +) + +func (s *measure) Write(value *measurev1.DataPointValue) error { + entity, shardID, err := s.entityLocator.Locate(s.name, value.GetTagFamilies(), s.shardNum) + if err != nil { + return err + } + waitCh := make(chan struct{}) + err = s.write(shardID, tsdb.HashEntity(entity), value, func() { + close(waitCh) + }) + if err != nil { + close(waitCh) + return err + } + <-waitCh + return nil +} + +func (s *measure) write(shardID common.ShardID, seriesHashKey []byte, value *measurev1.DataPointValue, cb index.CallbackFn) error { + sm := s.schema + fLen := len(value.GetTagFamilies()) + if fLen < 1 { + return errors.Wrap(ErrMalformedElement, "no tag family") + } + if fLen > len(sm.TagFamilies) { + return errors.Wrap(ErrMalformedElement, "tag family number is more than expected") + } + shard, err := s.db.SupplyTSDB().Shard(shardID) + if err != nil { + return err + } + series, err := shard.Series().GetByHashKey(seriesHashKey) + if err != nil { + return err + } + t := value.GetTimestamp().AsTime() + wp, err := series.Span(timestamp.NewInclusiveTimeRangeDuration(t, 0)) + if err != nil { + if wp != nil { + _ = wp.Close() + } + return err + } + writeFn := func() (tsdb.Writer, error) { + builder := wp.WriterBuilder().Time(t) + for fi, family := range value.GetTagFamilies() { + familySpec := sm.GetTagFamilies()[fi] + if len(family.GetTags()) > len(familySpec.GetTags()) { + return nil, errors.Wrap(ErrMalformedElement, "tag number is more than expected") + } + for ti, tag := range family.GetTags() { + tagSpec := familySpec.GetTags()[ti] + tType, isNull := pbv1.TagValueTypeConv(tag) + if isNull { + continue + } + if tType != tagSpec.GetType() { + return nil, errors.Wrapf(ErrMalformedElement, "tag %s type is unexpected", tagSpec.GetName()) + } + } + bb, errMarshal := proto.Marshal(family) + if errMarshal != nil { + return nil, errMarshal + } + builder.Family(familyIdentity(sm.GetTagFamilies()[fi].GetName(), TagFlag), bb) + } + if len(value.GetFields()) > len(sm.GetFields()) { + return nil, errors.Wrap(ErrMalformedElement, "fields number is more than expected") + } + for fi, fieldValue := range value.GetFields() { + fieldSpec := sm.GetFields()[fi] + fType, isNull := pbv1.FieldValueTypeConv(fieldValue) + if isNull { + continue + } + if fType != fieldSpec.GetFieldType() { + return nil, errors.Wrapf(ErrMalformedElement, "field %s type is unexpected", fieldSpec.GetName()) + } + data := encodeFieldValue(fieldValue) + if data == nil { + continue + } + builder.Family(familyIdentity(sm.GetFields()[fi].GetName(), encoderFieldFlag(fieldSpec)), data) + } + writer, errWrite := builder.Build() + if errWrite != nil { + return nil, errWrite + } + _, errWrite = writer.Write() + s.l.Debug(). + Time("ts", t). + Int("ts_nano", t.Nanosecond()). + Interface("data", value). + Uint64("series_id", uint64(series.ID())). + Uint64("item_id", uint64(writer.ItemID().ID)). + Int("shard_id", int(shardID)). + Msg("write measure") + return writer, errWrite + } + writer, err := writeFn() + if err != nil { + _ = wp.Close() + return err + } + m := index.Message{ + LocalWriter: writer, + Value: index.Value{ + TagFamilies: value.GetTagFamilies(), + Timestamp: value.GetTimestamp().AsTime(), + }, + BlockCloser: wp, + Cb: cb, + } + s.indexWriter.Write(m) + return err +} + +type writeCallback struct { + l *logger.Logger + schemaRepo *schemaRepo +} + +func setUpWriteCallback(l *logger.Logger, schemaRepo *schemaRepo) *writeCallback { + wcb := &writeCallback{ + l: l, + schemaRepo: schemaRepo, + } + return wcb +} + +func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) { + writeEvent, ok := message.Data().(*measurev1.InternalWriteRequest) + if !ok { + w.l.Warn().Msg("invalid event data type") + return + } + + stm, ok := w.schemaRepo.loadMeasure(writeEvent.GetRequest().GetMetadata()) + if !ok { + w.l.Warn().Msg("cannot find measure definition") + return + } + err := stm.write(common.ShardID(writeEvent.GetShardId()), writeEvent.GetSeriesHash(), writeEvent.GetRequest().GetDataPoint(), nil) + if err != nil { + w.l.Debug().Err(err).Msg("fail to write entity") + } + return +} + +func familyIdentity(name string, flag byte) []byte { + return bytes.Join([][]byte{[]byte(name), {flag}}, nil) +} + +func encodeFieldValue(fieldValue *modelv1.FieldValue) []byte { + switch fieldValue.GetValue().(type) { + case *modelv1.FieldValue_Int: + return convert.Int64ToBytes(fieldValue.GetInt().GetValue()) + case *modelv1.FieldValue_Str: + return []byte(fieldValue.GetStr().Value) + case *modelv1.FieldValue_BinaryData: + return fieldValue.GetBinaryData() + } + return nil +} + +func decodeFieldValue(fieldValue []byte, fieldSpec *databasev1.FieldSpec) *modelv1.FieldValue { + switch fieldSpec.GetFieldType() { + case databasev1.FieldType_FIELD_TYPE_STRING: + return &modelv1.FieldValue{Value: &modelv1.FieldValue_Str{Str: &modelv1.Str{Value: string(fieldValue)}}} + case databasev1.FieldType_FIELD_TYPE_INT: + return &modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: convert.BytesToInt64(fieldValue)}}} + case databasev1.FieldType_FIELD_TYPE_DATA_BINARY: + return &modelv1.FieldValue{Value: &modelv1.FieldValue_BinaryData{BinaryData: fieldValue}} + } + return &modelv1.FieldValue{Value: &modelv1.FieldValue_Null{}} +} + +func encoderFieldFlag(fieldSpec *databasev1.FieldSpec) byte { + encodingMethod := byte(fieldSpec.GetEncodingMethod().Number()) + compressionMethod := byte(fieldSpec.GetCompressionMethod().Number()) + return encodingMethod<<4 | compressionMethod +} diff --git a/banyand/measure/measure_write_test.go b/banyand/measure/measure_write_test.go new file mode 100644 index 0000000..eceaaff --- /dev/null +++ b/banyand/measure/measure_write_test.go @@ -0,0 +1,79 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure_test + +import ( + "embed" + "encoding/json" + "time" + + "github.com/golang/protobuf/jsonpb" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "google.golang.org/protobuf/types/known/timestamppb" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" + "github.com/apache/skywalking-banyandb/banyand/measure" +) + +var _ = Describe("Write", func() { + var svcs *services + var deferFn func() + var measure measure.Measure + + BeforeEach(func() { + svcs, deferFn = setUp() + var err error + measure, err = svcs.measure.Measure(&commonv1.Metadata{ + Name: "cpm", + Group: "default", + }) + Expect(err).ShouldNot(HaveOccurred()) + }) + AfterEach(func() { + deferFn() + }) + It("writes data", func() { + writeData("write_data.json", measure) + }) +}) + +//go:embed testdata/*.json +var dataFS embed.FS + +func writeData(dataFile string, measure measure.Measure) (baseTime time.Time) { + var templates []interface{} + baseTime = time.Now() + content, err := dataFS.ReadFile("testdata/" + dataFile) + Expect(err).ShouldNot(HaveOccurred()) + Expect(json.Unmarshal(content, &templates)).ShouldNot(HaveOccurred()) + now := time.Now() + for i, template := range templates { + rawDataPointValue, errMarshal := json.Marshal(template) + Expect(errMarshal).ShouldNot(HaveOccurred()) + dataPointValue := &measurev1.DataPointValue{} + if dataPointValue.Timestamp == nil { + dataPointValue.Timestamp = timestamppb.New(now.Add(time.Duration(i) * time.Minute)) + } + Expect(jsonpb.UnmarshalString(string(rawDataPointValue), dataPointValue)).ShouldNot(HaveOccurred()) + errInner := measure.Write(dataPointValue) + Expect(errInner).ShouldNot(HaveOccurred()) + } + return baseTime +} diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go new file mode 100644 index 0000000..2a2e96e --- /dev/null +++ b/banyand/measure/metadata.go @@ -0,0 +1,207 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +package measure + +import ( + "context" + "time" + + "github.com/apache/skywalking-banyandb/api/event" + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + "github.com/apache/skywalking-banyandb/banyand/discovery" + "github.com/apache/skywalking-banyandb/banyand/metadata" + "github.com/apache/skywalking-banyandb/banyand/metadata/schema" + "github.com/apache/skywalking-banyandb/banyand/tsdb" + "github.com/apache/skywalking-banyandb/pkg/encoding" + "github.com/apache/skywalking-banyandb/pkg/logger" + resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema" +) + +type schemaRepo struct { + resourceSchema.Repository + l *logger.Logger + metadata metadata.Repo +} + +func newSchemaRepo(path string, metadata metadata.Repo, repo discovery.ServiceRepo, l *logger.Logger) schemaRepo { + return schemaRepo{ + l: l, + metadata: metadata, + Repository: resourceSchema.NewRepository( + metadata, + repo, + l, + newSupplier(path, metadata, l), + event.MeasureTopicShardEvent, + event.MeasureTopicEntityEvent, + ), + } +} + +func (sr *schemaRepo) OnAddOrUpdate(m schema.Metadata) { + switch m.Kind { + case schema.KindGroup: + g := m.Spec.(*commonv1.Group) + if g.Catalog != commonv1.Catalog_CATALOG_MEASURE { + return + } + sr.SendMetadataEvent(resourceSchema.MetadataEvent{ + Typ: resourceSchema.EventAddOrUpdate, + Kind: resourceSchema.EventKindGroup, + Metadata: g.GetMetadata(), + }) + case schema.KindMeasure: + sr.SendMetadataEvent(resourceSchema.MetadataEvent{ + Typ: resourceSchema.EventAddOrUpdate, + Kind: resourceSchema.EventKindResource, + Metadata: m.Spec.(*databasev1.Measure).GetMetadata(), + }) + case schema.KindIndexRuleBinding: + irb, ok := m.Spec.(*databasev1.IndexRuleBinding) + if !ok { + sr.l.Warn().Msg("fail to convert message to IndexRuleBinding") + return + } + if irb.GetSubject().Catalog == commonv1.Catalog_CATALOG_MEASURE { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + stm, err := sr.metadata.MeasureRegistry().GetMeasure(ctx, &commonv1.Metadata{ + Name: irb.GetSubject().GetName(), + Group: m.Group, + }) + cancel() + if err != nil { + sr.l.Error().Err(err).Msg("fail to get subject") + return + } + sr.SendMetadataEvent(resourceSchema.MetadataEvent{ + Typ: resourceSchema.EventAddOrUpdate, + Kind: resourceSchema.EventKindResource, + Metadata: stm.GetMetadata(), + }) + } + case schema.KindIndexRule: + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + subjects, err := sr.metadata.Subjects(ctx, m.Spec.(*databasev1.IndexRule), commonv1.Catalog_CATALOG_MEASURE) + cancel() + if err != nil { + sr.l.Error().Err(err).Msg("fail to get subjects(measure)") + return + } + for _, sub := range subjects { + sr.SendMetadataEvent(resourceSchema.MetadataEvent{ + Typ: resourceSchema.EventAddOrUpdate, + Kind: resourceSchema.EventKindResource, + Metadata: sub.(*databasev1.Measure).GetMetadata(), + }) + } + default: + } +} + +func (sr *schemaRepo) OnDelete(m schema.Metadata) { + switch m.Kind { + case schema.KindGroup: + g := m.Spec.(*commonv1.Group) + if g.Catalog != commonv1.Catalog_CATALOG_MEASURE { + return + } + sr.SendMetadataEvent(resourceSchema.MetadataEvent{ + Typ: resourceSchema.EventDelete, + Kind: resourceSchema.EventKindGroup, + Metadata: g.GetMetadata(), + }) + case schema.KindMeasure: + sr.SendMetadataEvent(resourceSchema.MetadataEvent{ + Typ: resourceSchema.EventDelete, + Kind: resourceSchema.EventKindResource, + Metadata: m.Spec.(*databasev1.Measure).GetMetadata(), + }) + case schema.KindIndexRuleBinding: + if m.Spec.(*databasev1.IndexRuleBinding).GetSubject().Catalog == commonv1.Catalog_CATALOG_MEASURE { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + stm, err := sr.metadata.MeasureRegistry().GetMeasure(ctx, &commonv1.Metadata{ + Name: m.Name, + Group: m.Group, + }) + cancel() + if err != nil { + sr.l.Error().Err(err).Msg("fail to get subject") + return + } + sr.SendMetadataEvent(resourceSchema.MetadataEvent{ + Typ: resourceSchema.EventDelete, + Kind: resourceSchema.EventKindResource, + Metadata: stm.GetMetadata(), + }) + } + case schema.KindIndexRule: + default: + } +} + +func (sr *schemaRepo) loadMeasure(metadata *commonv1.Metadata) (*measure, bool) { + r, ok := sr.LoadResource(metadata) + if !ok { + return nil, false + } + s, ok := r.(*measure) + return s, ok +} + +var _ resourceSchema.ResourceSupplier = (*supplier)(nil) + +type supplier struct { + path string + metadata metadata.Repo + l *logger.Logger +} + +func newSupplier(path string, metadata metadata.Repo, l *logger.Logger) *supplier { + return &supplier{ + path: path, + metadata: metadata, + l: l, + } +} + +func (s *supplier) OpenResource(shardNum uint32, db tsdb.Supplier, spec resourceSchema.ResourceSpec) (resourceSchema.Resource, error) { + measureSchema := spec.Schema.(*databasev1.Measure) + return openMeasure(shardNum, db, measureSpec{ + schema: measureSchema, + indexRules: spec.IndexRules, + }, s.l) +} +func (s *supplier) ResourceSchema(repo metadata.Repo, md *commonv1.Metadata) (resourceSchema.ResourceSchema, error) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + return s.metadata.MeasureRegistry().GetMeasure(ctx, md) +} + +func (s *supplier) OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error) { + return tsdb.OpenDatabase( + context.TODO(), + tsdb.DatabaseOpts{ + Location: s.path, + ShardNum: groupSchema.ResourceOpts.ShardNum, + EncodingMethod: tsdb.EncodingMethod{ + EncoderPool: encoding.NewPlainEncoderPool(chunkSize), + DecoderPool: encoding.NewPlainDecoderPool(chunkSize), + }, + }) +} diff --git a/banyand/stream/metadata_test.go b/banyand/measure/metadata_test.go similarity index 60% copy from banyand/stream/metadata_test.go copy to banyand/measure/metadata_test.go index 216bbd4..0dc8120 100644 --- a/banyand/stream/metadata_test.go +++ b/banyand/measure/metadata_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package stream +package measure_test import ( "context" @@ -27,6 +27,7 @@ import ( "github.com/apache/skywalking-banyandb/api/event" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + "github.com/apache/skywalking-banyandb/pkg/test" ) var _ = Describe("Metadata", func() { @@ -44,24 +45,24 @@ var _ = Describe("Metadata", func() { Context("Manage group", func() { It("should pass smoke test", func() { Eventually(func() bool { - _, ok := svcs.stream.schemaRepo.LoadGroup("default") + _, ok := svcs.measure.LoadGroup("default") return ok }).WithTimeout(10 * time.Second).Should(BeTrue()) }) It("should close the group", func() { - svcs.repo.EXPECT().Publish(event.StreamTopicShardEvent, &shardEventMatcher{action: databasev1.Action_ACTION_DELETE}).Times(2) + svcs.repo.EXPECT().Publish(event.MeasureTopicShardEvent, test.NewShardEventMatcher(databasev1.Action_ACTION_DELETE)).Times(2) deleted, err := svcs.metadataService.GroupRegistry().DeleteGroup(context.TODO(), "default") Expect(err).ShouldNot(HaveOccurred()) Expect(deleted).Should(BeTrue()) Eventually(func() bool { - _, ok := svcs.stream.schemaRepo.LoadGroup("default") + _, ok := svcs.measure.LoadGroup("default") return ok }).WithTimeout(10 * time.Second).Should(BeFalse()) }) It("should add shards", func() { - svcs.repo.EXPECT().Publish(event.StreamTopicShardEvent, &shardEventMatcher{action: databasev1.Action_ACTION_DELETE}).Times(2) - svcs.repo.EXPECT().Publish(event.StreamTopicShardEvent, &shardEventMatcher{action: databasev1.Action_ACTION_PUT}).Times(4) + svcs.repo.EXPECT().Publish(event.MeasureTopicShardEvent, test.NewShardEventMatcher(databasev1.Action_ACTION_DELETE)).Times(2) + svcs.repo.EXPECT().Publish(event.MeasureTopicShardEvent, test.NewShardEventMatcher(databasev1.Action_ACTION_PUT)).Times(4) groupSchema, err := svcs.metadataService.GroupRegistry().GetGroup(context.TODO(), "default") Expect(err).ShouldNot(HaveOccurred()) Expect(groupSchema).ShouldNot(BeNil()) @@ -70,7 +71,7 @@ var _ = Describe("Metadata", func() { Expect(svcs.metadataService.GroupRegistry().UpdateGroup(context.TODO(), groupSchema)).Should(Succeed()) Eventually(func() bool { - group, ok := svcs.stream.schemaRepo.LoadGroup("default") + group, ok := svcs.measure.LoadGroup("default") if !ok { return false } @@ -79,65 +80,65 @@ var _ = Describe("Metadata", func() { }) }) - Context("Manage stream", func() { + Context("Manage measure", func() { It("should pass smoke test", func() { Eventually(func() bool { - _, ok := svcs.stream.schemaRepo.loadStream(&commonv1.Metadata{ - Name: "sw", + _, err := svcs.measure.Measure(&commonv1.Metadata{ + Name: "cpm", Group: "default", }) - return ok + return err == nil }).WithTimeout(10 * time.Second).Should(BeTrue()) }) - It("should close the stream", func() { - svcs.repo.EXPECT().Publish(event.StreamTopicEntityEvent, &entityEventMatcher{action: databasev1.Action_ACTION_DELETE}).Times(1) - deleted, err := svcs.metadataService.StreamRegistry().DeleteStream(context.TODO(), &commonv1.Metadata{ - Name: "sw", + It("should close the measure", func() { + svcs.repo.EXPECT().Publish(event.MeasureTopicEntityEvent, test.NewEntityEventMatcher(databasev1.Action_ACTION_DELETE)).Times(1) + deleted, err := svcs.metadataService.MeasureRegistry().DeleteMeasure(context.TODO(), &commonv1.Metadata{ + Name: "cpm", Group: "default", }) Expect(err).ShouldNot(HaveOccurred()) Expect(deleted).Should(BeTrue()) Eventually(func() bool { - _, ok := svcs.stream.schemaRepo.loadStream(&commonv1.Metadata{ - Name: "sw", + _, err := svcs.measure.Measure(&commonv1.Metadata{ + Name: "cpm", Group: "default", }) - return ok + return err != nil }).WithTimeout(10 * time.Second).Should(BeFalse()) }) - Context("Update a stream", func() { - var streamSchema *databasev1.Stream + Context("Update a measure", func() { + var measureSchema *databasev1.Measure BeforeEach(func() { var err error - streamSchema, err = svcs.metadataService.StreamRegistry().GetStream(context.TODO(), &commonv1.Metadata{ - Name: "sw", + measureSchema, err = svcs.metadataService.MeasureRegistry().GetMeasure(context.TODO(), &commonv1.Metadata{ + Name: "cpm", Group: "default", }) Expect(err).ShouldNot(HaveOccurred()) - Expect(streamSchema).ShouldNot(BeNil()) + Expect(measureSchema).ShouldNot(BeNil()) }) - It("should update a new stream", func() { - svcs.repo.EXPECT().Publish(event.StreamTopicEntityEvent, &entityEventMatcher{action: databasev1.Action_ACTION_PUT}).Times(1) + It("should update a new measure", func() { + svcs.repo.EXPECT().Publish(event.MeasureTopicEntityEvent, test.NewEntityEventMatcher(databasev1.Action_ACTION_PUT)).Times(1) // Remove the first tag from the entity - streamSchema.Entity.TagNames = streamSchema.Entity.TagNames[1:] - entitySize := len(streamSchema.Entity.TagNames) + measureSchema.Entity.TagNames = measureSchema.Entity.TagNames[1:] + entitySize := len(measureSchema.Entity.TagNames) - Expect(svcs.metadataService.StreamRegistry().UpdateStream(context.TODO(), streamSchema)).Should(Succeed()) + Expect(svcs.metadataService.MeasureRegistry().UpdateMeasure(context.TODO(), measureSchema)).Should(Succeed()) Eventually(func() bool { - val, ok := svcs.stream.schemaRepo.loadStream(&commonv1.Metadata{ - Name: "sw", + val, err := svcs.measure.Measure(&commonv1.Metadata{ + Name: "cpm", Group: "default", }) - if !ok { + if err != nil { return false } - return len(val.schema.GetEntity().TagNames) == entitySize + return len(val.GetSchema().GetEntity().TagNames) == entitySize }).WithTimeout(10 * time.Second).Should(BeTrue()) }) }) diff --git a/banyand/measure/service.go b/banyand/measure/service.go new file mode 100644 index 0000000..0181409 --- /dev/null +++ b/banyand/measure/service.go @@ -0,0 +1,156 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "context" + "time" + + "github.com/pkg/errors" + + "github.com/apache/skywalking-banyandb/api/data" + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + "github.com/apache/skywalking-banyandb/banyand/discovery" + "github.com/apache/skywalking-banyandb/banyand/metadata" + "github.com/apache/skywalking-banyandb/banyand/metadata/schema" + "github.com/apache/skywalking-banyandb/banyand/queue" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/run" + resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema" +) + +var ( + ErrEmptyRootPath = errors.New("root path is empty") + ErrMeasureNotExist = errors.New("measure doesn't exist") +) + +type Service interface { + run.PreRunner + run.Config + run.Service + Query +} + +var _ Service = (*service)(nil) + +type service struct { + schemaRepo schemaRepo + writeListener *writeCallback + l *logger.Logger + metadata metadata.Repo + root string + pipeline queue.Queue + repo discovery.ServiceRepo + // stop channel for the service + stopCh chan struct{} +} + +func (s *service) Measure(metadata *commonv1.Metadata) (Measure, error) { + sm, ok := s.schemaRepo.loadMeasure(metadata) + if !ok { + return nil, errors.WithStack(ErrMeasureNotExist) + } + return sm, nil +} +func (s *service) LoadGroup(name string) (resourceSchema.Group, bool) { + return s.schemaRepo.LoadGroup(name) +} + +func (s *service) FlagSet() *run.FlagSet { + flagS := run.NewFlagSet("storage") + flagS.StringVar(&s.root, "root-path", "/tmp", "the root path of database") + return flagS +} + +func (s *service) Validate() error { + if s.root == "" { + return ErrEmptyRootPath + } + return nil +} + +func (s *service) Name() string { + return "measure" +} + +func (s *service) PreRun() error { + s.l = logger.GetLogger(s.Name()) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + groups, err := s.metadata.GroupRegistry().ListGroup(ctx) + cancel() + if err != nil { + return err + } + s.schemaRepo = newSchemaRepo(s.root, s.metadata, s.repo, s.l) + for _, g := range groups { + if g.Catalog != commonv1.Catalog_CATALOG_MEASURE { + continue + } + gp, err := s.schemaRepo.StoreGroup(g.Metadata) + if err != nil { + return err + } + ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) + schemas, err := s.metadata.MeasureRegistry().ListMeasure(ctx, schema.ListOpt{Group: gp.GetSchema().GetMetadata().Name}) + cancel() + if err != nil { + return err + } + for _, sa := range schemas { + if _, innerErr := gp.StoreResource(sa); innerErr != nil { + return innerErr + } + } + } + + s.writeListener = setUpWriteCallback(s.l, &s.schemaRepo) + + errWrite := s.pipeline.Subscribe(data.TopicMeasureWrite, s.writeListener) + if errWrite != nil { + return errWrite + } + return nil +} + +func (s *service) Serve() run.StopNotify { + _ = s.schemaRepo.NotifyAll() + // run a serial watcher + go s.schemaRepo.Watcher() + + s.metadata.MeasureRegistry().RegisterHandler(schema.KindGroup|schema.KindMeasure|schema.KindIndexRuleBinding|schema.KindIndexRule, + &s.schemaRepo) + + s.stopCh = make(chan struct{}) + return s.stopCh +} + +func (s *service) GracefulStop() { + s.schemaRepo.Close() + if s.stopCh != nil { + close(s.stopCh) + } +} + +// NewService returns a new service +func NewService(_ context.Context, metadata metadata.Repo, repo discovery.ServiceRepo, pipeline queue.Queue) (Service, error) { + return &service{ + metadata: metadata, + repo: repo, + pipeline: pipeline, + }, nil +} diff --git a/banyand/measure/testdata/query_data.json b/banyand/measure/testdata/query_data.json new file mode 100644 index 0000000..4eb3e7e --- /dev/null +++ b/banyand/measure/testdata/query_data.json @@ -0,0 +1,107 @@ +[ + { + "tag_families": [ + { + "tags": [ + { + "str": { + "value": "1" + } + }, + { + "str": { + "value": "minute" + } + } + ] + } + ], + "fields": [ + { + "int": { + "value": 100 + } + }, + { + "int": { + "value": 100 + } + }, + { + "int": { + "value": 1 + } + } + ] + }, + { + "tag_families": [ + { + "tags": [ + { + "str": { + "value": "1" + } + }, + { + "str": { + "value": "minute" + } + } + ] + } + ], + "fields": [ + { + "int": { + "value": 200 + } + }, + { + "int": { + "value": 50 + } + }, + { + "int": { + "value": 4 + } + } + ] + }, + { + "tag_families": [ + { + "tags": [ + { + "str": { + "value": "1" + } + }, + { + "str": { + "value": "minute" + } + } + ] + } + ], + "fields": [ + { + "int": { + "value": 150 + } + }, + { + "int": { + "value": 300 + } + }, + { + "int": { + "value": 5 + } + } + ] + } +] \ No newline at end of file diff --git a/banyand/measure/testdata/write_data.json b/banyand/measure/testdata/write_data.json new file mode 100644 index 0000000..fbfe02f --- /dev/null +++ b/banyand/measure/testdata/write_data.json @@ -0,0 +1,107 @@ +[ + { + "tag_families": [ + { + "tags": [ + { + "str": { + "value": "1" + } + }, + { + "str": { + "value": "minute" + } + } + ] + } + ], + "fields": [ + { + "int": { + "value": 100 + } + }, + { + "int": { + "value": 100 + } + }, + { + "int": { + "value": 1 + } + } + ] + }, + { + "tag_families": [ + { + "tags": [ + { + "str": { + "value": "1" + } + }, + { + "str": { + "value": "minute" + } + } + ] + } + ], + "fields": [ + { + "int": { + "value": 100 + } + }, + { + "int": { + "value": 100 + } + }, + { + "int": { + "value": 1 + } + } + ] + }, + { + "tag_families": [ + { + "tags": [ + { + "str": { + "value": "1" + } + }, + { + "str": { + "value": "minute" + } + } + ] + } + ], + "fields": [ + { + "int": { + "value": 100 + } + }, + { + "int": { + "value": 100 + } + }, + { + "int": { + "value": 1 + } + } + ] + } +] \ No newline at end of file diff --git a/banyand/stream/metadata_test.go b/banyand/stream/metadata_test.go index 216bbd4..a980b2c 100644 --- a/banyand/stream/metadata_test.go +++ b/banyand/stream/metadata_test.go @@ -27,6 +27,7 @@ import ( "github.com/apache/skywalking-banyandb/api/event" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + "github.com/apache/skywalking-banyandb/pkg/test" ) var _ = Describe("Metadata", func() { @@ -49,7 +50,7 @@ var _ = Describe("Metadata", func() { }).WithTimeout(10 * time.Second).Should(BeTrue()) }) It("should close the group", func() { - svcs.repo.EXPECT().Publish(event.StreamTopicShardEvent, &shardEventMatcher{action: databasev1.Action_ACTION_DELETE}).Times(2) + svcs.repo.EXPECT().Publish(event.StreamTopicShardEvent, test.NewShardEventMatcher(databasev1.Action_ACTION_DELETE)).Times(2) deleted, err := svcs.metadataService.GroupRegistry().DeleteGroup(context.TODO(), "default") Expect(err).ShouldNot(HaveOccurred()) Expect(deleted).Should(BeTrue()) @@ -60,8 +61,8 @@ var _ = Describe("Metadata", func() { }) It("should add shards", func() { - svcs.repo.EXPECT().Publish(event.StreamTopicShardEvent, &shardEventMatcher{action: databasev1.Action_ACTION_DELETE}).Times(2) - svcs.repo.EXPECT().Publish(event.StreamTopicShardEvent, &shardEventMatcher{action: databasev1.Action_ACTION_PUT}).Times(4) + svcs.repo.EXPECT().Publish(event.StreamTopicShardEvent, test.NewShardEventMatcher(databasev1.Action_ACTION_DELETE)).Times(2) + svcs.repo.EXPECT().Publish(event.StreamTopicShardEvent, test.NewShardEventMatcher(databasev1.Action_ACTION_PUT)).Times(4) groupSchema, err := svcs.metadataService.GroupRegistry().GetGroup(context.TODO(), "default") Expect(err).ShouldNot(HaveOccurred()) Expect(groupSchema).ShouldNot(BeNil()) @@ -90,7 +91,7 @@ var _ = Describe("Metadata", func() { }).WithTimeout(10 * time.Second).Should(BeTrue()) }) It("should close the stream", func() { - svcs.repo.EXPECT().Publish(event.StreamTopicEntityEvent, &entityEventMatcher{action: databasev1.Action_ACTION_DELETE}).Times(1) + svcs.repo.EXPECT().Publish(event.StreamTopicEntityEvent, test.NewEntityEventMatcher(databasev1.Action_ACTION_DELETE)).Times(1) deleted, err := svcs.metadataService.StreamRegistry().DeleteStream(context.TODO(), &commonv1.Metadata{ Name: "sw", Group: "default", @@ -121,7 +122,7 @@ var _ = Describe("Metadata", func() { }) It("should update a new stream", func() { - svcs.repo.EXPECT().Publish(event.StreamTopicEntityEvent, &entityEventMatcher{action: databasev1.Action_ACTION_PUT}).Times(1) + svcs.repo.EXPECT().Publish(event.StreamTopicEntityEvent, test.NewEntityEventMatcher(databasev1.Action_ACTION_PUT)).Times(1) // Remove the first tag from the entity streamSchema.Entity.TagNames = streamSchema.Entity.TagNames[1:] entitySize := len(streamSchema.Entity.TagNames) diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go index 2856938..83854ac 100644 --- a/banyand/stream/stream.go +++ b/banyand/stream/stream.go @@ -26,6 +26,7 @@ import ( "github.com/apache/skywalking-banyandb/banyand/tsdb/index" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/partition" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/schema" ) @@ -69,20 +70,10 @@ func (s *stream) Close() error { return s.indexWriter.Close() } -func parseMaxModRevision(indexRules []*databasev1.IndexRule) (maxRevisionForIdxRules int64) { - maxRevisionForIdxRules = int64(0) - for _, idxRule := range indexRules { - if idxRule.GetMetadata().GetModRevision() > maxRevisionForIdxRules { - maxRevisionForIdxRules = idxRule.GetMetadata().GetModRevision() - } - } - return -} - func (s *stream) parseSpec() { s.name, s.group = s.schema.GetMetadata().GetName(), s.schema.GetMetadata().GetGroup() s.entityLocator = partition.NewEntityLocator(s.schema.GetTagFamilies(), s.schema.GetEntity()) - s.maxObservedModRevision = parseMaxModRevision(s.indexRules) + s.maxObservedModRevision = pbv1.ParseMaxModRevision(s.indexRules) } type streamSpec struct { diff --git a/banyand/stream/stream_query.go b/banyand/stream/stream_query.go index 2896f0d..493966f 100644 --- a/banyand/stream/stream_query.go +++ b/banyand/stream/stream_query.go @@ -55,7 +55,7 @@ func (s *stream) Shards(entity tsdb.Entity) ([]tsdb.Shard, error) { wrap := func(shards []tsdb.Shard) []tsdb.Shard { result := make([]tsdb.Shard, len(shards)) for i := 0; i < len(shards); i++ { - result[i] = newShardDelegate(tsdb.Entry(s.name), shards[i]) + result[i] = tsdb.NewScopedShard(tsdb.Entry(s.name), shards[i]) } return result } @@ -76,7 +76,7 @@ func (s *stream) Shards(entity tsdb.Entity) ([]tsdb.Shard, error) { if err != nil { return nil, err } - return []tsdb.Shard{newShardDelegate(tsdb.Entry(s.name), shard)}, nil + return []tsdb.Shard{tsdb.NewScopedShard(tsdb.Entry(s.name), shard)}, nil } func (s *stream) Shard(id common.ShardID) (tsdb.Shard, error) { @@ -84,7 +84,7 @@ func (s *stream) Shard(id common.ShardID) (tsdb.Shard, error) { if err != nil { return nil, err } - return newShardDelegate(tsdb.Entry(s.name), shard), nil + return tsdb.NewScopedShard(tsdb.Entry(s.name), shard), nil } func (s *stream) ParseTagFamily(family string, item tsdb.Item) (*modelv1.TagFamily, error) { @@ -128,68 +128,3 @@ func (s *stream) ParseElementID(item tsdb.Item) (string, error) { } return string(rawBytes), nil } - -var _ tsdb.Shard = (*shardDelegate)(nil) - -type shardDelegate struct { - scope tsdb.Entry - delegated tsdb.Shard -} - -func newShardDelegate(scope tsdb.Entry, delegated tsdb.Shard) tsdb.Shard { - return &shardDelegate{ - scope: scope, - delegated: delegated, - } -} - -func (sd *shardDelegate) Close() error { - // the delegate can't close the underlying shard - return nil -} - -func (sd *shardDelegate) ID() common.ShardID { - return sd.delegated.ID() -} - -func (sd *shardDelegate) Series() tsdb.SeriesDatabase { - return &seriesDatabaseDelegate{ - scope: sd.scope, - delegated: sd.delegated.Series(), - } -} - -func (sd *shardDelegate) Index() tsdb.IndexDatabase { - return sd.delegated.Index() -} - -func (sd *shardDelegate) State() tsdb.ShardState { - return sd.delegated.State() -} - -var _ tsdb.SeriesDatabase = (*seriesDatabaseDelegate)(nil) - -type seriesDatabaseDelegate struct { - scope tsdb.Entry - delegated tsdb.SeriesDatabase -} - -func (sdd *seriesDatabaseDelegate) Close() error { - return nil -} - -func (sdd *seriesDatabaseDelegate) GetByHashKey(key []byte) (tsdb.Series, error) { - return sdd.delegated.GetByHashKey(key) -} - -func (sdd *seriesDatabaseDelegate) GetByID(id common.SeriesID) (tsdb.Series, error) { - return sdd.delegated.GetByID(id) -} - -func (sdd *seriesDatabaseDelegate) Get(entity tsdb.Entity) (tsdb.Series, error) { - return sdd.delegated.Get(entity.Prepend(sdd.scope)) -} - -func (sdd *seriesDatabaseDelegate) List(path tsdb.Path) (tsdb.SeriesList, error) { - return sdd.delegated.List(path.Prepand(sdd.scope)) -} diff --git a/banyand/stream/stream_query_test.go b/banyand/stream/stream_query_test.go index cca1637..cae08e6 100644 --- a/banyand/stream/stream_query_test.go +++ b/banyand/stream/stream_query_test.go @@ -54,26 +54,27 @@ type shardStruct struct { type shardsForTest []shardStruct var _ = Describe("Write", func() { - var ( - s *stream - deferFn func() - ) - BeforeEach(func() { - var svcs *services - svcs, deferFn = setUp() - var ok bool - s, ok = svcs.stream.schemaRepo.loadStream(&commonv1.Metadata{ - Name: "sw", - Group: "default", + Context("Select shard", func() { + var ( + s *stream + deferFn func() + ) + + BeforeEach(func() { + var svcs *services + svcs, deferFn = setUp() + var ok bool + s, ok = svcs.stream.schemaRepo.loadStream(&commonv1.Metadata{ + Name: "sw", + Group: "default", + }) + Expect(ok).To(BeTrue()) }) - Expect(ok).To(BeTrue()) - }) - AfterEach(func() { - deferFn() - }) - Context("Select shard", func() { + AfterEach(func() { + deferFn() + }) tests := []struct { name string entity tsdb.Entity @@ -107,11 +108,26 @@ var _ = Describe("Write", func() { }) } }) - Context("Querying by local indices", func() { - var now time.Time - BeforeEach(func() { + Context("Querying by local indices", Ordered, func() { + var ( + s *stream + now time.Time + deferFn func() + ) + BeforeAll(func() { + var svcs *services + svcs, deferFn = setUp() + var ok bool + s, ok = svcs.stream.schemaRepo.loadStream(&commonv1.Metadata{ + Name: "sw", + Group: "default", + }) + Expect(ok).To(BeTrue()) now = setupQueryData("multiple_shards.json", s) }) + AfterAll(func() { + deferFn() + }) When("", func() { l1 := []string{fmt.Sprintf("series_%d", tsdb.SeriesID(tsdb.Entity{ tsdb.Entry("sw"), @@ -581,10 +597,25 @@ var _ = Describe("Write", func() { }) }) - Context("Querying by global indices", func() { - BeforeEach(func() { + Context("Querying by global indices", Ordered, func() { + var ( + s *stream + deferFn func() + ) + BeforeAll(func() { + var svcs *services + svcs, deferFn = setUp() + var ok bool + s, ok = svcs.stream.schemaRepo.loadStream(&commonv1.Metadata{ + Name: "sw", + Group: "default", + }) + Expect(ok).To(BeTrue()) _ = setupQueryData("global_index.json", s) }) + AfterAll(func() { + deferFn() + }) DescribeTable("", func(traceID string, wantTraceSegmentNum int, wantErr bool) { shards, errShards := s.Shards(nil) Expect(errShards).ShouldNot(HaveOccurred()) diff --git a/banyand/stream/stream_suite_test.go b/banyand/stream/stream_suite_test.go index 53b7618..b3d3f60 100644 --- a/banyand/stream/stream_suite_test.go +++ b/banyand/stream/stream_suite_test.go @@ -19,7 +19,6 @@ package stream import ( "context" - "fmt" "testing" "github.com/golang/mock/gomock" @@ -31,7 +30,6 @@ import ( "github.com/apache/skywalking-banyandb/banyand/discovery" "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/queue" - "github.com/apache/skywalking-banyandb/pkg/bus" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/test" teststream "github.com/apache/skywalking-banyandb/pkg/test/stream" @@ -50,47 +48,6 @@ var _ = BeforeSuite(func() { })).To(Succeed()) }) -var ( - _ gomock.Matcher = (*shardEventMatcher)(nil) - _ gomock.Matcher = (*entityEventMatcher)(nil) -) - -type shardEventMatcher struct { - action databasev1.Action -} - -func (s *shardEventMatcher) Matches(x interface{}) bool { - if m, messageOk := x.(bus.Message); messageOk { - if evt, dataOk := m.Data().(*databasev1.ShardEvent); dataOk { - return evt.Action == s.action - } - } - - return false -} - -func (s *shardEventMatcher) String() string { - return fmt.Sprintf("shard-event-matcher(%s)", databasev1.Action_name[int32(s.action)]) -} - -type entityEventMatcher struct { - action databasev1.Action -} - -func (s *entityEventMatcher) Matches(x interface{}) bool { - if m, messageOk := x.(bus.Message); messageOk { - if evt, dataOk := m.Data().(*databasev1.EntityEvent); dataOk { - return evt.Action == s.action - } - } - - return false -} - -func (s *entityEventMatcher) String() string { - return fmt.Sprintf("entity-event-matcher(%s)", databasev1.Action_name[int32(s.action)]) -} - // service to preload stream type preloadStreamService struct { metaSvc metadata.Service @@ -117,8 +74,8 @@ func setUp() (*services, func()) { repo := discovery.NewMockServiceRepo(ctrl) repo.EXPECT().NodeID().AnyTimes() // Both PreRun and Serve phases send events - repo.EXPECT().Publish(event.StreamTopicEntityEvent, &entityEventMatcher{action: databasev1.Action_ACTION_PUT}).Times(2 * 1) - repo.EXPECT().Publish(event.StreamTopicShardEvent, &shardEventMatcher{action: databasev1.Action_ACTION_PUT}).Times(2 * 2) + repo.EXPECT().Publish(event.StreamTopicEntityEvent, test.NewEntityEventMatcher(databasev1.Action_ACTION_PUT)).Times(2 * 1) + repo.EXPECT().Publish(event.StreamTopicShardEvent, test.NewShardEventMatcher(databasev1.Action_ACTION_PUT)).Times(2 * 2) // Init Pipeline pipeline, err := queue.NewQueue(context.TODO(), repo) diff --git a/banyand/tsdb/scope.go b/banyand/tsdb/scope.go new file mode 100644 index 0000000..cbbe687 --- /dev/null +++ b/banyand/tsdb/scope.go @@ -0,0 +1,85 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +package tsdb + +import "github.com/apache/skywalking-banyandb/api/common" + +var _ Shard = (*ScopedShard)(nil) + +type ScopedShard struct { + scope Entry + delegated Shard +} + +func NewScopedShard(scope Entry, delegated Shard) Shard { + return &ScopedShard{ + scope: scope, + delegated: delegated, + } +} + +func (sd *ScopedShard) Close() error { + // the delegate can't close the underlying shard + return nil +} + +func (sd *ScopedShard) ID() common.ShardID { + return sd.delegated.ID() +} + +func (sd *ScopedShard) Series() SeriesDatabase { + return &scopedSeriesDatabase{ + scope: sd.scope, + delegated: sd.delegated.Series(), + } +} + +func (sd *ScopedShard) Index() IndexDatabase { + return sd.delegated.Index() +} + +func (sd *ScopedShard) State() ShardState { + return sd.delegated.State() +} + +var _ SeriesDatabase = (*scopedSeriesDatabase)(nil) + +type scopedSeriesDatabase struct { + scope Entry + delegated SeriesDatabase +} + +func (sdd *scopedSeriesDatabase) Close() error { + return nil +} + +func (sdd *scopedSeriesDatabase) GetByHashKey(key []byte) (Series, error) { + return sdd.delegated.GetByHashKey(key) +} + +func (sdd *scopedSeriesDatabase) GetByID(id common.SeriesID) (Series, error) { + return sdd.delegated.GetByID(id) +} + +func (sdd *scopedSeriesDatabase) Get(entity Entity) (Series, error) { + return sdd.delegated.Get(entity.Prepend(sdd.scope)) +} + +func (sdd *scopedSeriesDatabase) List(path Path) (SeriesList, error) { + return sdd.delegated.List(path.Prepand(sdd.scope)) +} diff --git a/pkg/test/matcher.go b/pkg/test/matcher.go new file mode 100644 index 0000000..f218bf0 --- /dev/null +++ b/pkg/test/matcher.go @@ -0,0 +1,80 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +package test + +import ( + "fmt" + + "github.com/golang/mock/gomock" + + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + "github.com/apache/skywalking-banyandb/pkg/bus" +) + +var ( + _ gomock.Matcher = (*shardEventMatcher)(nil) + _ gomock.Matcher = (*entityEventMatcher)(nil) +) + +type shardEventMatcher struct { + action databasev1.Action +} + +func NewShardEventMatcher(action databasev1.Action) gomock.Matcher { + return &shardEventMatcher{ + action: action, + } +} + +func (s *shardEventMatcher) Matches(x interface{}) bool { + if m, messageOk := x.(bus.Message); messageOk { + if evt, dataOk := m.Data().(*databasev1.ShardEvent); dataOk { + return evt.Action == s.action + } + } + + return false +} + +func (s *shardEventMatcher) String() string { + return fmt.Sprintf("shard-event-matcher(%s)", databasev1.Action_name[int32(s.action)]) +} + +type entityEventMatcher struct { + action databasev1.Action +} + +func NewEntityEventMatcher(action databasev1.Action) gomock.Matcher { + return &entityEventMatcher{ + action: action, + } +} + +func (s *entityEventMatcher) Matches(x interface{}) bool { + if m, messageOk := x.(bus.Message); messageOk { + if evt, dataOk := m.Data().(*databasev1.EntityEvent); dataOk { + return evt.Action == s.action + } + } + + return false +} + +func (s *entityEventMatcher) String() string { + return fmt.Sprintf("entity-event-matcher(%s)", databasev1.Action_name[int32(s.action)]) +} diff --git a/pkg/test/measure/etcd.go b/pkg/test/measure/etcd.go index c08968e..9f20108 100644 --- a/pkg/test/measure/etcd.go +++ b/pkg/test/measure/etcd.go @@ -21,13 +21,13 @@ import ( "context" "embed" "fmt" - "math/rand" "os" "path" "github.com/google/uuid" "google.golang.org/protobuf/encoding/protojson" + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" ) @@ -41,9 +41,18 @@ var ( indexRuleBindingJSON string //go:embed testdata/measure.json measureJSON string + //go:embed testdata/group.json + groupJSON string ) func PreloadSchema(e schema.Registry) error { + g := &commonv1.Group{} + if err := protojson.Unmarshal([]byte(groupJSON), g); err != nil { + return err + } + if err := e.UpdateGroup(context.TODO(), g); err != nil { + return err + } s := &databasev1.Measure{} if err := protojson.Unmarshal([]byte(measureJSON), s); err != nil { return err @@ -88,9 +97,3 @@ func PreloadSchema(e schema.Registry) error { func RandomTempDir() string { return path.Join(os.TempDir(), fmt.Sprintf("banyandb-embed-etcd-%s", uuid.New().String())) } - -func RandomUnixDomainListener() (string, string) { - i := rand.Uint64() - return fmt.Sprintf("%s://localhost:%d%06d", "unix", os.Getpid(), i), - fmt.Sprintf("%s://localhost:%d%06d", "unix", os.Getpid(), i+1) -} diff --git a/pkg/test/measure/testdata/group.json b/pkg/test/measure/testdata/group.json new file mode 100644 index 0000000..c2b6c02 --- /dev/null +++ b/pkg/test/measure/testdata/group.json @@ -0,0 +1,27 @@ +{ + "metadata": { + "name": "default" + }, + "catalog": "CATALOG_MEASURE", + "resource_opts": { + "shard_num": 2, + "interval_rules": [ + { + "tag_name": "scope", + "str": "minute", + "interval": "1m" + }, + { + "tag_name": "scope", + "str": "hour", + "interval": "1h" + }, + { + "tag_name": "scope", + "str": "day", + "interval": "1d" + } + ] + }, + "updated_at": "2021-04-15T01:30:15.01Z" +} \ No newline at end of file diff --git a/pkg/test/measure/testdata/measure.json b/pkg/test/measure/testdata/measure.json index a0ab240..47e341a 100644 --- a/pkg/test/measure/testdata/measure.json +++ b/pkg/test/measure/testdata/measure.json @@ -43,29 +43,5 @@ "entity_id" ] }, - "interval_rules": [ - { - "tag_name": "scope", - "str": "minute", - "interval": "1m" - }, - { - "tag_name": "scope", - "str": "hour", - "interval": "1h" - }, - { - "tag_name": "scope", - "str": "day", - "interval": "1d" - } - ], - "opts": { - "shard_num": 2, - "ttl": { - "val": 7, - "unit": "DURATION_UNIT_DAY" - } - }, "updated_at": "2021-04-15T01:30:15.01Z" } \ No newline at end of file
