This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 90c441d  Add measure module (#86)
90c441d is described below

commit 90c441d9b5468b5bbd1ea1b6231b3e4e0755b9c3
Author: Gao Hongtao <[email protected]>
AuthorDate: Fri Feb 25 22:21:57 2022 +0800

    Add measure module (#86)
---
 banyand/{stream/stream.go => measure/measure.go}   |  47 ++---
 .../stream_query.go => measure/measure_query.go}   | 117 +++--------
 banyand/measure/measure_query_test.go              | 104 ++++++++++
 banyand/measure/measure_suite_test.go              | 120 +++++++++++
 banyand/measure/measure_write.go                   | 227 +++++++++++++++++++++
 banyand/measure/measure_write_test.go              |  79 +++++++
 banyand/measure/metadata.go                        | 207 +++++++++++++++++++
 banyand/{stream => measure}/metadata_test.go       |  65 +++---
 banyand/measure/service.go                         | 156 ++++++++++++++
 banyand/measure/testdata/query_data.json           | 107 ++++++++++
 banyand/measure/testdata/write_data.json           | 107 ++++++++++
 banyand/stream/metadata_test.go                    |  11 +-
 banyand/stream/stream.go                           |  13 +-
 banyand/stream/stream_query.go                     |  71 +------
 banyand/stream/stream_query_test.go                |  75 +++++--
 banyand/stream/stream_suite_test.go                |  47 +----
 banyand/tsdb/scope.go                              |  85 ++++++++
 pkg/test/matcher.go                                |  80 ++++++++
 pkg/test/measure/etcd.go                           |  17 +-
 pkg/test/measure/testdata/group.json               |  27 +++
 pkg/test/measure/testdata/measure.json             |  24 ---
 21 files changed, 1460 insertions(+), 326 deletions(-)

diff --git a/banyand/stream/stream.go b/banyand/measure/measure.go
similarity index 69%
copy from banyand/stream/stream.go
copy to banyand/measure/measure.go
index 2856938..481197d 100644
--- a/banyand/stream/stream.go
+++ b/banyand/measure/measure.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package stream
+package measure
 
 import (
        "context"
@@ -26,21 +26,18 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/tsdb/index"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/partition"
-       "github.com/apache/skywalking-banyandb/pkg/schema"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
 )
 
 // a chunk is 1MB
 const chunkSize = 1 << 20
 
-var _ schema.Resource = (*stream)(nil)
-
-type stream struct {
+type measure struct {
        name     string
        group    string
        shardNum uint32
        l        *logger.Logger
-       // schema is the reference to the spec of the stream
-       schema *databasev1.Stream
+       schema   *databasev1.Measure
        // maxObservedModRevision is the max observed revision of index rules 
in the spec
        maxObservedModRevision int64
        db                     tsdb.Supplier
@@ -49,49 +46,43 @@ type stream struct {
        indexWriter            *index.Writer
 }
 
-func (s *stream) GetMetadata() *commonv1.Metadata {
+func (s *measure) GetSchema() *databasev1.Measure {
+       return s.schema
+}
+
+func (s *measure) GetMetadata() *commonv1.Metadata {
        return s.schema.Metadata
 }
 
-func (s *stream) GetIndexRules() []*databasev1.IndexRule {
+func (s *measure) GetIndexRules() []*databasev1.IndexRule {
        return s.indexRules
 }
 
-func (s *stream) MaxObservedModRevision() int64 {
+func (s *measure) MaxObservedModRevision() int64 {
        return s.maxObservedModRevision
 }
 
-func (s *stream) EntityLocator() partition.EntityLocator {
+func (s *measure) EntityLocator() partition.EntityLocator {
        return s.entityLocator
 }
 
-func (s *stream) Close() error {
+func (s *measure) Close() error {
        return s.indexWriter.Close()
 }
 
-func parseMaxModRevision(indexRules []*databasev1.IndexRule) 
(maxRevisionForIdxRules int64) {
-       maxRevisionForIdxRules = int64(0)
-       for _, idxRule := range indexRules {
-               if idxRule.GetMetadata().GetModRevision() > 
maxRevisionForIdxRules {
-                       maxRevisionForIdxRules = 
idxRule.GetMetadata().GetModRevision()
-               }
-       }
-       return
-}
-
-func (s *stream) parseSpec() {
+func (s *measure) parseSpec() {
        s.name, s.group = s.schema.GetMetadata().GetName(), 
s.schema.GetMetadata().GetGroup()
        s.entityLocator = partition.NewEntityLocator(s.schema.GetTagFamilies(), 
s.schema.GetEntity())
-       s.maxObservedModRevision = parseMaxModRevision(s.indexRules)
+       s.maxObservedModRevision = pbv1.ParseMaxModRevision(s.indexRules)
 }
 
-type streamSpec struct {
-       schema     *databasev1.Stream
+type measureSpec struct {
+       schema     *databasev1.Measure
        indexRules []*databasev1.IndexRule
 }
 
-func openStream(shardNum uint32, db tsdb.Supplier, spec streamSpec, l 
*logger.Logger) (*stream, error) {
-       sm := &stream{
+func openMeasure(shardNum uint32, db tsdb.Supplier, spec measureSpec, l 
*logger.Logger) (*measure, error) {
+       sm := &measure{
                shardNum:   shardNum,
                schema:     spec.schema,
                indexRules: spec.indexRules,
diff --git a/banyand/stream/stream_query.go b/banyand/measure/measure_query.go
similarity index 53%
copy from banyand/stream/stream_query.go
copy to banyand/measure/measure_query.go
index 2896f0d..4d482b2 100644
--- a/banyand/stream/stream_query.go
+++ b/banyand/measure/measure_query.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package stream
+package measure
 
 import (
        "io"
@@ -26,10 +26,11 @@ import (
        "github.com/apache/skywalking-banyandb/api/common"
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
-       streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
        "github.com/apache/skywalking-banyandb/banyand/tsdb"
        "github.com/apache/skywalking-banyandb/pkg/partition"
+       resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
 )
 
 var (
@@ -37,25 +38,27 @@ var (
 )
 
 type Query interface {
-       Stream(stream *commonv1.Metadata) (Stream, error)
+       LoadGroup(name string) (resourceSchema.Group, bool)
+       Measure(measure *commonv1.Metadata) (Measure, error)
 }
 
-type Stream interface {
+type Measure interface {
        io.Closer
-       Write(value *streamv1.ElementValue) error
+       Write(value *measurev1.DataPointValue) error
        Shards(entity tsdb.Entity) ([]tsdb.Shard, error)
        Shard(id common.ShardID) (tsdb.Shard, error)
        ParseTagFamily(family string, item tsdb.Item) (*modelv1.TagFamily, 
error)
-       ParseElementID(item tsdb.Item) (string, error)
+       ParseField(name string, item tsdb.Item) (*measurev1.DataPoint_Field, 
error)
+       GetSchema() *databasev1.Measure
 }
 
-var _ Stream = (*stream)(nil)
+var _ Measure = (*measure)(nil)
 
-func (s *stream) Shards(entity tsdb.Entity) ([]tsdb.Shard, error) {
+func (s *measure) Shards(entity tsdb.Entity) ([]tsdb.Shard, error) {
        wrap := func(shards []tsdb.Shard) []tsdb.Shard {
                result := make([]tsdb.Shard, len(shards))
                for i := 0; i < len(shards); i++ {
-                       result[i] = newShardDelegate(tsdb.Entry(s.name), 
shards[i])
+                       result[i] = tsdb.NewScopedShard(tsdb.Entry(s.name), 
shards[i])
                }
                return result
        }
@@ -76,21 +79,21 @@ func (s *stream) Shards(entity tsdb.Entity) ([]tsdb.Shard, 
error) {
        if err != nil {
                return nil, err
        }
-       return []tsdb.Shard{newShardDelegate(tsdb.Entry(s.name), shard)}, nil
+       return []tsdb.Shard{tsdb.NewScopedShard(tsdb.Entry(s.name), shard)}, nil
 }
 
-func (s *stream) Shard(id common.ShardID) (tsdb.Shard, error) {
+func (s *measure) Shard(id common.ShardID) (tsdb.Shard, error) {
        shard, err := s.db.SupplyTSDB().Shard(id)
        if err != nil {
                return nil, err
        }
-       return newShardDelegate(tsdb.Entry(s.name), shard), nil
+       return tsdb.NewScopedShard(tsdb.Entry(s.name), shard), nil
 }
 
-func (s *stream) ParseTagFamily(family string, item tsdb.Item) 
(*modelv1.TagFamily, error) {
-       familyRawBytes, err := item.Family(family)
+func (s *measure) ParseTagFamily(family string, item tsdb.Item) 
(*modelv1.TagFamily, error) {
+       familyRawBytes, err := item.Family(string(familyIdentity(family, 
TagFlag)))
        if err != nil {
-               return nil, errors.Wrapf(err, "parse family %s", family)
+               return nil, err
        }
        tagFamily := &modelv1.TagFamilyForWrite{}
        err = proto.Unmarshal(familyRawBytes, tagFamily)
@@ -121,75 +124,21 @@ func (s *stream) ParseTagFamily(family string, item 
tsdb.Item) (*modelv1.TagFami
        }, err
 }
 
-func (s *stream) ParseElementID(item tsdb.Item) (string, error) {
-       rawBytes, err := item.Val()
-       if err != nil {
-               return "", err
-       }
-       return string(rawBytes), nil
-}
-
-var _ tsdb.Shard = (*shardDelegate)(nil)
-
-type shardDelegate struct {
-       scope     tsdb.Entry
-       delegated tsdb.Shard
-}
-
-func newShardDelegate(scope tsdb.Entry, delegated tsdb.Shard) tsdb.Shard {
-       return &shardDelegate{
-               scope:     scope,
-               delegated: delegated,
+func (s *measure) ParseField(name string, item tsdb.Item) 
(*measurev1.DataPoint_Field, error) {
+       var fieldSpec *databasev1.FieldSpec
+       for _, spec := range s.schema.GetFields() {
+               if spec.GetName() == name {
+                       fieldSpec = spec
+                       break
+               }
        }
-}
-
-func (sd *shardDelegate) Close() error {
-       // the delegate can't close the underlying shard
-       return nil
-}
-
-func (sd *shardDelegate) ID() common.ShardID {
-       return sd.delegated.ID()
-}
-
-func (sd *shardDelegate) Series() tsdb.SeriesDatabase {
-       return &seriesDatabaseDelegate{
-               scope:     sd.scope,
-               delegated: sd.delegated.Series(),
+       bytes, err := item.Family(string(familyIdentity(name, 
encoderFieldFlag(fieldSpec))))
+       if err != nil {
+               return nil, err
        }
-}
-
-func (sd *shardDelegate) Index() tsdb.IndexDatabase {
-       return sd.delegated.Index()
-}
-
-func (sd *shardDelegate) State() tsdb.ShardState {
-       return sd.delegated.State()
-}
-
-var _ tsdb.SeriesDatabase = (*seriesDatabaseDelegate)(nil)
-
-type seriesDatabaseDelegate struct {
-       scope     tsdb.Entry
-       delegated tsdb.SeriesDatabase
-}
-
-func (sdd *seriesDatabaseDelegate) Close() error {
-       return nil
-}
-
-func (sdd *seriesDatabaseDelegate) GetByHashKey(key []byte) (tsdb.Series, 
error) {
-       return sdd.delegated.GetByHashKey(key)
-}
-
-func (sdd *seriesDatabaseDelegate) GetByID(id common.SeriesID) (tsdb.Series, 
error) {
-       return sdd.delegated.GetByID(id)
-}
-
-func (sdd *seriesDatabaseDelegate) Get(entity tsdb.Entity) (tsdb.Series, 
error) {
-       return sdd.delegated.Get(entity.Prepend(sdd.scope))
-}
-
-func (sdd *seriesDatabaseDelegate) List(path tsdb.Path) (tsdb.SeriesList, 
error) {
-       return sdd.delegated.List(path.Prepand(sdd.scope))
+       fieldValue := decodeFieldValue(bytes, fieldSpec)
+       return &measurev1.DataPoint_Field{
+               Name:  name,
+               Value: fieldValue,
+       }, err
 }
diff --git a/banyand/measure/measure_query_test.go 
b/banyand/measure/measure_query_test.go
new file mode 100644
index 0000000..3a9828d
--- /dev/null
+++ b/banyand/measure/measure_query_test.go
@@ -0,0 +1,104 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreementmeasure. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure_test
+
+import (
+       "time"
+
+       . "github.com/onsi/ginkgo/v2"
+       . "github.com/onsi/gomega"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/banyand/measure"
+       "github.com/apache/skywalking-banyandb/banyand/tsdb"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+var _ = Describe("Write", func() {
+       var svcs *services
+       var deferFn func()
+       var measure measure.Measure
+
+       BeforeEach(func() {
+               svcs, deferFn = setUp()
+               var err error
+               measure, err = svcs.measure.Measure(&commonv1.Metadata{
+                       Name:  "cpm",
+                       Group: "default",
+               })
+               Expect(err).ShouldNot(HaveOccurred())
+       })
+       AfterEach(func() {
+               deferFn()
+       })
+       It("queries data", func() {
+               baseTime := writeData("query_data.json", measure)
+               shard, err := measure.Shard(0)
+               Expect(err).ShouldNot(HaveOccurred())
+               series, err := shard.Series().Get(tsdb.Entity{tsdb.Entry("1")})
+               Expect(err).ShouldNot(HaveOccurred())
+               seriesSpan, err := 
series.Span(timestamp.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour))
+               defer func(seriesSpan tsdb.SeriesSpan) {
+                       _ = seriesSpan.Close()
+               }(seriesSpan)
+               Expect(err).ShouldNot(HaveOccurred())
+               seeker, err := 
seriesSpan.SeekerBuilder().OrderByTime(modelv1.Sort_SORT_DESC).Build()
+               Expect(err).ShouldNot(HaveOccurred())
+               iter, err := seeker.Seek()
+               Expect(err).ShouldNot(HaveOccurred())
+               Expect(len(iter)).To(Equal(1))
+               defer func(iterator tsdb.Iterator) {
+                       _ = iterator.Close()
+               }(iter[0])
+               i := 0
+               expectedFields := [][]int64{{150, 300, 5}, {200, 50, 4}, {100, 
100, 1}}
+               for ; iter[0].Next(); i++ {
+                       item := iter[0].Val()
+                       tagFamily, err := measure.ParseTagFamily("default", 
item)
+                       Expect(err).ShouldNot(HaveOccurred())
+                       Expect(len(tagFamily.Tags)).To(Equal(2))
+                       summation, err := measure.ParseField("summation", item)
+                       Expect(err).ShouldNot(HaveOccurred())
+                       
Expect(summation.GetValue().GetInt().Value).To(Equal(expectedFields[i][0]))
+                       count, err := measure.ParseField("count", item)
+                       Expect(err).ShouldNot(HaveOccurred())
+                       
Expect(count.GetValue().GetInt().Value).To(Equal(expectedFields[i][1]))
+                       value, err := measure.ParseField("value", item)
+                       Expect(err).ShouldNot(HaveOccurred())
+                       
Expect(value.GetValue().GetInt().Value).To(Equal(expectedFields[i][2]))
+               }
+       })
+})
diff --git a/banyand/measure/measure_suite_test.go 
b/banyand/measure/measure_suite_test.go
new file mode 100644
index 0000000..c8c80fb
--- /dev/null
+++ b/banyand/measure/measure_suite_test.go
@@ -0,0 +1,120 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package measure_test
+
+import (
+       "context"
+       "testing"
+
+       "github.com/golang/mock/gomock"
+       "github.com/onsi/ginkgo/v2"
+       "github.com/onsi/gomega"
+
+       "github.com/apache/skywalking-banyandb/api/event"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/discovery"
+       "github.com/apache/skywalking-banyandb/banyand/measure"
+       "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+       testmeasure "github.com/apache/skywalking-banyandb/pkg/test/measure"
+)
+
+func TestMeasure(t *testing.T) {
+       gomega.RegisterFailHandler(ginkgo.Fail)
+       ginkgo.RunSpecs(t, "Measure Suite")
+}
+
+// BeforeSuite - Init logger
+var _ = ginkgo.BeforeSuite(func() {
+       gomega.Expect(logger.Init(logger.Logging{
+               Env:   "dev",
+               Level: "info",
+       })).To(gomega.Succeed())
+})
+
+// service to preload measure
+type preloadMeasureService struct {
+       metaSvc metadata.Service
+}
+
+func (p *preloadMeasureService) Name() string {
+       return "preload-measure"
+}
+
+func (p *preloadMeasureService) PreRun() error {
+       return testmeasure.PreloadSchema(p.metaSvc.SchemaRegistry())
+}
+
+type services struct {
+       measure         measure.Service
+       metadataService metadata.Service
+       repo            *discovery.MockServiceRepo
+       pipeline        queue.Queue
+}
+
+func setUp() (*services, func()) {
+       ctrl := gomock.NewController(ginkgo.GinkgoT())
+       gomega.Expect(ctrl).ShouldNot(gomega.BeNil())
+       // Init Discovery
+       repo := discovery.NewMockServiceRepo(ctrl)
+       repo.EXPECT().NodeID().AnyTimes()
+       // Both PreRun and Serve phases send events
+       repo.EXPECT().Publish(event.MeasureTopicEntityEvent, 
test.NewEntityEventMatcher(databasev1.Action_ACTION_PUT)).Times(2 * 1)
+       repo.EXPECT().Publish(event.MeasureTopicShardEvent, 
test.NewShardEventMatcher(databasev1.Action_ACTION_PUT)).Times(2 * 2)
+
+       // Init Pipeline
+       pipeline, err := queue.NewQueue(context.TODO(), repo)
+       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+       // Init Metadata Service
+       metadataService, err := metadata.NewService(context.TODO())
+       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+       // Init Measure Service
+       measureService, err := measure.NewService(context.TODO(), 
metadataService, repo, pipeline)
+       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+       preloadMeasureSvc := &preloadMeasureService{metaSvc: metadataService}
+       var flags []string
+       metaPath, metaDeferFunc, err := test.NewSpace()
+       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+       flags = append(flags, "--metadata-root-path="+metaPath)
+       rootPath, deferFunc, err := test.NewSpace()
+       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+       flags = append(flags, "--root-path="+rootPath)
+       moduleDeferFunc := test.SetUpModules(
+               flags,
+               repo,
+               pipeline,
+               metadataService,
+               preloadMeasureSvc,
+               measureService,
+       )
+       return &services{
+                       measure:         measureService,
+                       metadataService: metadataService,
+                       repo:            repo,
+                       pipeline:        pipeline,
+               }, func() {
+                       moduleDeferFunc()
+                       metaDeferFunc()
+                       deferFunc()
+               }
+}
diff --git a/banyand/measure/measure_write.go b/banyand/measure/measure_write.go
new file mode 100644
index 0000000..5b777e7
--- /dev/null
+++ b/banyand/measure/measure_write.go
@@ -0,0 +1,227 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "bytes"
+
+       "github.com/pkg/errors"
+       "google.golang.org/protobuf/proto"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/banyand/tsdb"
+       "github.com/apache/skywalking-banyandb/banyand/tsdb/index"
+       "github.com/apache/skywalking-banyandb/pkg/bus"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+var (
+       ErrMalformedElement = errors.New("element is malformed")
+)
+
+const (
+       TagFlag byte = iota
+)
+
+func (s *measure) Write(value *measurev1.DataPointValue) error {
+       entity, shardID, err := s.entityLocator.Locate(s.name, 
value.GetTagFamilies(), s.shardNum)
+       if err != nil {
+               return err
+       }
+       waitCh := make(chan struct{})
+       err = s.write(shardID, tsdb.HashEntity(entity), value, func() {
+               close(waitCh)
+       })
+       if err != nil {
+               close(waitCh)
+               return err
+       }
+       <-waitCh
+       return nil
+}
+
+func (s *measure) write(shardID common.ShardID, seriesHashKey []byte, value 
*measurev1.DataPointValue, cb index.CallbackFn) error {
+       sm := s.schema
+       fLen := len(value.GetTagFamilies())
+       if fLen < 1 {
+               return errors.Wrap(ErrMalformedElement, "no tag family")
+       }
+       if fLen > len(sm.TagFamilies) {
+               return errors.Wrap(ErrMalformedElement, "tag family number is 
more than expected")
+       }
+       shard, err := s.db.SupplyTSDB().Shard(shardID)
+       if err != nil {
+               return err
+       }
+       series, err := shard.Series().GetByHashKey(seriesHashKey)
+       if err != nil {
+               return err
+       }
+       t := value.GetTimestamp().AsTime()
+       wp, err := series.Span(timestamp.NewInclusiveTimeRangeDuration(t, 0))
+       if err != nil {
+               if wp != nil {
+                       _ = wp.Close()
+               }
+               return err
+       }
+       writeFn := func() (tsdb.Writer, error) {
+               builder := wp.WriterBuilder().Time(t)
+               for fi, family := range value.GetTagFamilies() {
+                       familySpec := sm.GetTagFamilies()[fi]
+                       if len(family.GetTags()) > len(familySpec.GetTags()) {
+                               return nil, errors.Wrap(ErrMalformedElement, 
"tag number is more than expected")
+                       }
+                       for ti, tag := range family.GetTags() {
+                               tagSpec := familySpec.GetTags()[ti]
+                               tType, isNull := pbv1.TagValueTypeConv(tag)
+                               if isNull {
+                                       continue
+                               }
+                               if tType != tagSpec.GetType() {
+                                       return nil, 
errors.Wrapf(ErrMalformedElement, "tag %s type is unexpected", 
tagSpec.GetName())
+                               }
+                       }
+                       bb, errMarshal := proto.Marshal(family)
+                       if errMarshal != nil {
+                               return nil, errMarshal
+                       }
+                       
builder.Family(familyIdentity(sm.GetTagFamilies()[fi].GetName(), TagFlag), bb)
+               }
+               if len(value.GetFields()) > len(sm.GetFields()) {
+                       return nil, errors.Wrap(ErrMalformedElement, "fields 
number is more than expected")
+               }
+               for fi, fieldValue := range value.GetFields() {
+                       fieldSpec := sm.GetFields()[fi]
+                       fType, isNull := pbv1.FieldValueTypeConv(fieldValue)
+                       if isNull {
+                               continue
+                       }
+                       if fType != fieldSpec.GetFieldType() {
+                               return nil, errors.Wrapf(ErrMalformedElement, 
"field %s type is unexpected", fieldSpec.GetName())
+                       }
+                       data := encodeFieldValue(fieldValue)
+                       if data == nil {
+                               continue
+                       }
+                       
builder.Family(familyIdentity(sm.GetFields()[fi].GetName(), 
encoderFieldFlag(fieldSpec)), data)
+               }
+               writer, errWrite := builder.Build()
+               if errWrite != nil {
+                       return nil, errWrite
+               }
+               _, errWrite = writer.Write()
+               s.l.Debug().
+                       Time("ts", t).
+                       Int("ts_nano", t.Nanosecond()).
+                       Interface("data", value).
+                       Uint64("series_id", uint64(series.ID())).
+                       Uint64("item_id", uint64(writer.ItemID().ID)).
+                       Int("shard_id", int(shardID)).
+                       Msg("write measure")
+               return writer, errWrite
+       }
+       writer, err := writeFn()
+       if err != nil {
+               _ = wp.Close()
+               return err
+       }
+       m := index.Message{
+               LocalWriter: writer,
+               Value: index.Value{
+                       TagFamilies: value.GetTagFamilies(),
+                       Timestamp:   value.GetTimestamp().AsTime(),
+               },
+               BlockCloser: wp,
+               Cb:          cb,
+       }
+       s.indexWriter.Write(m)
+       return err
+}
+
+type writeCallback struct {
+       l          *logger.Logger
+       schemaRepo *schemaRepo
+}
+
+func setUpWriteCallback(l *logger.Logger, schemaRepo *schemaRepo) 
*writeCallback {
+       wcb := &writeCallback{
+               l:          l,
+               schemaRepo: schemaRepo,
+       }
+       return wcb
+}
+
+func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) {
+       writeEvent, ok := message.Data().(*measurev1.InternalWriteRequest)
+       if !ok {
+               w.l.Warn().Msg("invalid event data type")
+               return
+       }
+
+       stm, ok := 
w.schemaRepo.loadMeasure(writeEvent.GetRequest().GetMetadata())
+       if !ok {
+               w.l.Warn().Msg("cannot find measure definition")
+               return
+       }
+       err := stm.write(common.ShardID(writeEvent.GetShardId()), 
writeEvent.GetSeriesHash(), writeEvent.GetRequest().GetDataPoint(), nil)
+       if err != nil {
+               w.l.Debug().Err(err).Msg("fail to write entity")
+       }
+       return
+}
+
+func familyIdentity(name string, flag byte) []byte {
+       return bytes.Join([][]byte{[]byte(name), {flag}}, nil)
+}
+
+func encodeFieldValue(fieldValue *modelv1.FieldValue) []byte {
+       switch fieldValue.GetValue().(type) {
+       case *modelv1.FieldValue_Int:
+               return convert.Int64ToBytes(fieldValue.GetInt().GetValue())
+       case *modelv1.FieldValue_Str:
+               return []byte(fieldValue.GetStr().Value)
+       case *modelv1.FieldValue_BinaryData:
+               return fieldValue.GetBinaryData()
+       }
+       return nil
+}
+
+func decodeFieldValue(fieldValue []byte, fieldSpec *databasev1.FieldSpec) 
*modelv1.FieldValue {
+       switch fieldSpec.GetFieldType() {
+       case databasev1.FieldType_FIELD_TYPE_STRING:
+               return &modelv1.FieldValue{Value: &modelv1.FieldValue_Str{Str: 
&modelv1.Str{Value: string(fieldValue)}}}
+       case databasev1.FieldType_FIELD_TYPE_INT:
+               return &modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: 
&modelv1.Int{Value: convert.BytesToInt64(fieldValue)}}}
+       case databasev1.FieldType_FIELD_TYPE_DATA_BINARY:
+               return &modelv1.FieldValue{Value: 
&modelv1.FieldValue_BinaryData{BinaryData: fieldValue}}
+       }
+       return &modelv1.FieldValue{Value: &modelv1.FieldValue_Null{}}
+}
+
+func encoderFieldFlag(fieldSpec *databasev1.FieldSpec) byte {
+       encodingMethod := byte(fieldSpec.GetEncodingMethod().Number())
+       compressionMethod := byte(fieldSpec.GetCompressionMethod().Number())
+       return encodingMethod<<4 | compressionMethod
+}
diff --git a/banyand/measure/measure_write_test.go 
b/banyand/measure/measure_write_test.go
new file mode 100644
index 0000000..eceaaff
--- /dev/null
+++ b/banyand/measure/measure_write_test.go
@@ -0,0 +1,79 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure_test
+
+import (
+       "embed"
+       "encoding/json"
+       "time"
+
+       "github.com/golang/protobuf/jsonpb"
+       . "github.com/onsi/ginkgo/v2"
+       . "github.com/onsi/gomega"
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+       "github.com/apache/skywalking-banyandb/banyand/measure"
+)
+
+var _ = Describe("Write", func() {
+       var svcs *services
+       var deferFn func()
+       var measure measure.Measure
+
+       BeforeEach(func() {
+               svcs, deferFn = setUp()
+               var err error
+               measure, err = svcs.measure.Measure(&commonv1.Metadata{
+                       Name:  "cpm",
+                       Group: "default",
+               })
+               Expect(err).ShouldNot(HaveOccurred())
+       })
+       AfterEach(func() {
+               deferFn()
+       })
+       It("writes data", func() {
+               writeData("write_data.json", measure)
+       })
+})
+
+//go:embed testdata/*.json
+var dataFS embed.FS
+
+func writeData(dataFile string, measure measure.Measure) (baseTime time.Time) {
+       var templates []interface{}
+       baseTime = time.Now()
+       content, err := dataFS.ReadFile("testdata/" + dataFile)
+       Expect(err).ShouldNot(HaveOccurred())
+       Expect(json.Unmarshal(content, &templates)).ShouldNot(HaveOccurred())
+       now := time.Now()
+       for i, template := range templates {
+               rawDataPointValue, errMarshal := json.Marshal(template)
+               Expect(errMarshal).ShouldNot(HaveOccurred())
+               dataPointValue := &measurev1.DataPointValue{}
+               if dataPointValue.Timestamp == nil {
+                       dataPointValue.Timestamp = 
timestamppb.New(now.Add(time.Duration(i) * time.Minute))
+               }
+               Expect(jsonpb.UnmarshalString(string(rawDataPointValue), 
dataPointValue)).ShouldNot(HaveOccurred())
+               errInner := measure.Write(dataPointValue)
+               Expect(errInner).ShouldNot(HaveOccurred())
+       }
+       return baseTime
+}
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
new file mode 100644
index 0000000..2a2e96e
--- /dev/null
+++ b/banyand/measure/metadata.go
@@ -0,0 +1,207 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package measure
+
+import (
+       "context"
+       "time"
+
+       "github.com/apache/skywalking-banyandb/api/event"
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/discovery"
+       "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/banyand/tsdb"
+       "github.com/apache/skywalking-banyandb/pkg/encoding"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
+)
+
+type schemaRepo struct {
+       resourceSchema.Repository
+       l        *logger.Logger
+       metadata metadata.Repo
+}
+
+func newSchemaRepo(path string, metadata metadata.Repo, repo 
discovery.ServiceRepo, l *logger.Logger) schemaRepo {
+       return schemaRepo{
+               l:        l,
+               metadata: metadata,
+               Repository: resourceSchema.NewRepository(
+                       metadata,
+                       repo,
+                       l,
+                       newSupplier(path, metadata, l),
+                       event.MeasureTopicShardEvent,
+                       event.MeasureTopicEntityEvent,
+               ),
+       }
+}
+
+func (sr *schemaRepo) OnAddOrUpdate(m schema.Metadata) {
+       switch m.Kind {
+       case schema.KindGroup:
+               g := m.Spec.(*commonv1.Group)
+               if g.Catalog != commonv1.Catalog_CATALOG_MEASURE {
+                       return
+               }
+               sr.SendMetadataEvent(resourceSchema.MetadataEvent{
+                       Typ:      resourceSchema.EventAddOrUpdate,
+                       Kind:     resourceSchema.EventKindGroup,
+                       Metadata: g.GetMetadata(),
+               })
+       case schema.KindMeasure:
+               sr.SendMetadataEvent(resourceSchema.MetadataEvent{
+                       Typ:      resourceSchema.EventAddOrUpdate,
+                       Kind:     resourceSchema.EventKindResource,
+                       Metadata: m.Spec.(*databasev1.Measure).GetMetadata(),
+               })
+       case schema.KindIndexRuleBinding:
+               irb, ok := m.Spec.(*databasev1.IndexRuleBinding)
+               if !ok {
+                       sr.l.Warn().Msg("fail to convert message to 
IndexRuleBinding")
+                       return
+               }
+               if irb.GetSubject().Catalog == commonv1.Catalog_CATALOG_MEASURE 
{
+                       ctx, cancel := 
context.WithTimeout(context.Background(), 5*time.Second)
+                       stm, err := 
sr.metadata.MeasureRegistry().GetMeasure(ctx, &commonv1.Metadata{
+                               Name:  irb.GetSubject().GetName(),
+                               Group: m.Group,
+                       })
+                       cancel()
+                       if err != nil {
+                               sr.l.Error().Err(err).Msg("fail to get subject")
+                               return
+                       }
+                       sr.SendMetadataEvent(resourceSchema.MetadataEvent{
+                               Typ:      resourceSchema.EventAddOrUpdate,
+                               Kind:     resourceSchema.EventKindResource,
+                               Metadata: stm.GetMetadata(),
+                       })
+               }
+       case schema.KindIndexRule:
+               ctx, cancel := context.WithTimeout(context.Background(), 
5*time.Second)
+               subjects, err := sr.metadata.Subjects(ctx, 
m.Spec.(*databasev1.IndexRule), commonv1.Catalog_CATALOG_MEASURE)
+               cancel()
+               if err != nil {
+                       sr.l.Error().Err(err).Msg("fail to get 
subjects(measure)")
+                       return
+               }
+               for _, sub := range subjects {
+                       sr.SendMetadataEvent(resourceSchema.MetadataEvent{
+                               Typ:      resourceSchema.EventAddOrUpdate,
+                               Kind:     resourceSchema.EventKindResource,
+                               Metadata: 
sub.(*databasev1.Measure).GetMetadata(),
+                       })
+               }
+       default:
+       }
+}
+
+func (sr *schemaRepo) OnDelete(m schema.Metadata) {
+       switch m.Kind {
+       case schema.KindGroup:
+               g := m.Spec.(*commonv1.Group)
+               if g.Catalog != commonv1.Catalog_CATALOG_MEASURE {
+                       return
+               }
+               sr.SendMetadataEvent(resourceSchema.MetadataEvent{
+                       Typ:      resourceSchema.EventDelete,
+                       Kind:     resourceSchema.EventKindGroup,
+                       Metadata: g.GetMetadata(),
+               })
+       case schema.KindMeasure:
+               sr.SendMetadataEvent(resourceSchema.MetadataEvent{
+                       Typ:      resourceSchema.EventDelete,
+                       Kind:     resourceSchema.EventKindResource,
+                       Metadata: m.Spec.(*databasev1.Measure).GetMetadata(),
+               })
+       case schema.KindIndexRuleBinding:
+               if m.Spec.(*databasev1.IndexRuleBinding).GetSubject().Catalog 
== commonv1.Catalog_CATALOG_MEASURE {
+                       ctx, cancel := 
context.WithTimeout(context.Background(), 5*time.Second)
+                       stm, err := 
sr.metadata.MeasureRegistry().GetMeasure(ctx, &commonv1.Metadata{
+                               Name:  m.Name,
+                               Group: m.Group,
+                       })
+                       cancel()
+                       if err != nil {
+                               sr.l.Error().Err(err).Msg("fail to get subject")
+                               return
+                       }
+                       sr.SendMetadataEvent(resourceSchema.MetadataEvent{
+                               Typ:      resourceSchema.EventDelete,
+                               Kind:     resourceSchema.EventKindResource,
+                               Metadata: stm.GetMetadata(),
+                       })
+               }
+       case schema.KindIndexRule:
+       default:
+       }
+}
+
+func (sr *schemaRepo) loadMeasure(metadata *commonv1.Metadata) (*measure, 
bool) {
+       r, ok := sr.LoadResource(metadata)
+       if !ok {
+               return nil, false
+       }
+       s, ok := r.(*measure)
+       return s, ok
+}
+
+var _ resourceSchema.ResourceSupplier = (*supplier)(nil)
+
+type supplier struct {
+       path     string
+       metadata metadata.Repo
+       l        *logger.Logger
+}
+
+func newSupplier(path string, metadata metadata.Repo, l *logger.Logger) 
*supplier {
+       return &supplier{
+               path:     path,
+               metadata: metadata,
+               l:        l,
+       }
+}
+
+func (s *supplier) OpenResource(shardNum uint32, db tsdb.Supplier, spec 
resourceSchema.ResourceSpec) (resourceSchema.Resource, error) {
+       measureSchema := spec.Schema.(*databasev1.Measure)
+       return openMeasure(shardNum, db, measureSpec{
+               schema:     measureSchema,
+               indexRules: spec.IndexRules,
+       }, s.l)
+}
+func (s *supplier) ResourceSchema(repo metadata.Repo, md *commonv1.Metadata) 
(resourceSchema.ResourceSchema, error) {
+       ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+       defer cancel()
+       return s.metadata.MeasureRegistry().GetMeasure(ctx, md)
+}
+
+func (s *supplier) OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error) {
+       return tsdb.OpenDatabase(
+               context.TODO(),
+               tsdb.DatabaseOpts{
+                       Location: s.path,
+                       ShardNum: groupSchema.ResourceOpts.ShardNum,
+                       EncodingMethod: tsdb.EncodingMethod{
+                               EncoderPool: 
encoding.NewPlainEncoderPool(chunkSize),
+                               DecoderPool: 
encoding.NewPlainDecoderPool(chunkSize),
+                       },
+               })
+}
diff --git a/banyand/stream/metadata_test.go b/banyand/measure/metadata_test.go
similarity index 60%
copy from banyand/stream/metadata_test.go
copy to banyand/measure/metadata_test.go
index 216bbd4..0dc8120 100644
--- a/banyand/stream/metadata_test.go
+++ b/banyand/measure/metadata_test.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package stream
+package measure_test
 
 import (
        "context"
@@ -27,6 +27,7 @@ import (
        "github.com/apache/skywalking-banyandb/api/event"
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/pkg/test"
 )
 
 var _ = Describe("Metadata", func() {
@@ -44,24 +45,24 @@ var _ = Describe("Metadata", func() {
        Context("Manage group", func() {
                It("should pass smoke test", func() {
                        Eventually(func() bool {
-                               _, ok := 
svcs.stream.schemaRepo.LoadGroup("default")
+                               _, ok := svcs.measure.LoadGroup("default")
                                return ok
                        }).WithTimeout(10 * time.Second).Should(BeTrue())
                })
                It("should close the group", func() {
-                       svcs.repo.EXPECT().Publish(event.StreamTopicShardEvent, 
&shardEventMatcher{action: databasev1.Action_ACTION_DELETE}).Times(2)
+                       
svcs.repo.EXPECT().Publish(event.MeasureTopicShardEvent, 
test.NewShardEventMatcher(databasev1.Action_ACTION_DELETE)).Times(2)
                        deleted, err := 
svcs.metadataService.GroupRegistry().DeleteGroup(context.TODO(), "default")
                        Expect(err).ShouldNot(HaveOccurred())
                        Expect(deleted).Should(BeTrue())
                        Eventually(func() bool {
-                               _, ok := 
svcs.stream.schemaRepo.LoadGroup("default")
+                               _, ok := svcs.measure.LoadGroup("default")
                                return ok
                        }).WithTimeout(10 * time.Second).Should(BeFalse())
                })
 
                It("should add shards", func() {
-                       svcs.repo.EXPECT().Publish(event.StreamTopicShardEvent, 
&shardEventMatcher{action: databasev1.Action_ACTION_DELETE}).Times(2)
-                       svcs.repo.EXPECT().Publish(event.StreamTopicShardEvent, 
&shardEventMatcher{action: databasev1.Action_ACTION_PUT}).Times(4)
+                       
svcs.repo.EXPECT().Publish(event.MeasureTopicShardEvent, 
test.NewShardEventMatcher(databasev1.Action_ACTION_DELETE)).Times(2)
+                       
svcs.repo.EXPECT().Publish(event.MeasureTopicShardEvent, 
test.NewShardEventMatcher(databasev1.Action_ACTION_PUT)).Times(4)
                        groupSchema, err := 
svcs.metadataService.GroupRegistry().GetGroup(context.TODO(), "default")
                        Expect(err).ShouldNot(HaveOccurred())
                        Expect(groupSchema).ShouldNot(BeNil())
@@ -70,7 +71,7 @@ var _ = Describe("Metadata", func() {
                        
Expect(svcs.metadataService.GroupRegistry().UpdateGroup(context.TODO(), 
groupSchema)).Should(Succeed())
 
                        Eventually(func() bool {
-                               group, ok := 
svcs.stream.schemaRepo.LoadGroup("default")
+                               group, ok := svcs.measure.LoadGroup("default")
                                if !ok {
                                        return false
                                }
@@ -79,65 +80,65 @@ var _ = Describe("Metadata", func() {
                })
        })
 
-       Context("Manage stream", func() {
+       Context("Manage measure", func() {
                It("should pass smoke test", func() {
                        Eventually(func() bool {
-                               _, ok := 
svcs.stream.schemaRepo.loadStream(&commonv1.Metadata{
-                                       Name:  "sw",
+                               _, err := 
svcs.measure.Measure(&commonv1.Metadata{
+                                       Name:  "cpm",
                                        Group: "default",
                                })
-                               return ok
+                               return err == nil
                        }).WithTimeout(10 * time.Second).Should(BeTrue())
                })
-               It("should close the stream", func() {
-                       
svcs.repo.EXPECT().Publish(event.StreamTopicEntityEvent, 
&entityEventMatcher{action: databasev1.Action_ACTION_DELETE}).Times(1)
-                       deleted, err := 
svcs.metadataService.StreamRegistry().DeleteStream(context.TODO(), 
&commonv1.Metadata{
-                               Name:  "sw",
+               It("should close the measure", func() {
+                       
svcs.repo.EXPECT().Publish(event.MeasureTopicEntityEvent, 
test.NewEntityEventMatcher(databasev1.Action_ACTION_DELETE)).Times(1)
+                       deleted, err := 
svcs.metadataService.MeasureRegistry().DeleteMeasure(context.TODO(), 
&commonv1.Metadata{
+                               Name:  "cpm",
                                Group: "default",
                        })
                        Expect(err).ShouldNot(HaveOccurred())
                        Expect(deleted).Should(BeTrue())
                        Eventually(func() bool {
-                               _, ok := 
svcs.stream.schemaRepo.loadStream(&commonv1.Metadata{
-                                       Name:  "sw",
+                               _, err := 
svcs.measure.Measure(&commonv1.Metadata{
+                                       Name:  "cpm",
                                        Group: "default",
                                })
-                               return ok
+                               return err != nil
                        }).WithTimeout(10 * time.Second).Should(BeFalse())
                })
 
-               Context("Update a stream", func() {
-                       var streamSchema *databasev1.Stream
+               Context("Update a measure", func() {
+                       var measureSchema *databasev1.Measure
 
                        BeforeEach(func() {
                                var err error
-                               streamSchema, err = 
svcs.metadataService.StreamRegistry().GetStream(context.TODO(), 
&commonv1.Metadata{
-                                       Name:  "sw",
+                               measureSchema, err = 
svcs.metadataService.MeasureRegistry().GetMeasure(context.TODO(), 
&commonv1.Metadata{
+                                       Name:  "cpm",
                                        Group: "default",
                                })
 
                                Expect(err).ShouldNot(HaveOccurred())
-                               Expect(streamSchema).ShouldNot(BeNil())
+                               Expect(measureSchema).ShouldNot(BeNil())
                        })
 
-                       It("should update a new stream", func() {
-                               
svcs.repo.EXPECT().Publish(event.StreamTopicEntityEvent, 
&entityEventMatcher{action: databasev1.Action_ACTION_PUT}).Times(1)
+                       It("should update a new measure", func() {
+                               
svcs.repo.EXPECT().Publish(event.MeasureTopicEntityEvent, 
test.NewEntityEventMatcher(databasev1.Action_ACTION_PUT)).Times(1)
                                // Remove the first tag from the entity
-                               streamSchema.Entity.TagNames = 
streamSchema.Entity.TagNames[1:]
-                               entitySize := len(streamSchema.Entity.TagNames)
+                               measureSchema.Entity.TagNames = 
measureSchema.Entity.TagNames[1:]
+                               entitySize := len(measureSchema.Entity.TagNames)
 
-                               
Expect(svcs.metadataService.StreamRegistry().UpdateStream(context.TODO(), 
streamSchema)).Should(Succeed())
+                               
Expect(svcs.metadataService.MeasureRegistry().UpdateMeasure(context.TODO(), 
measureSchema)).Should(Succeed())
 
                                Eventually(func() bool {
-                                       val, ok := 
svcs.stream.schemaRepo.loadStream(&commonv1.Metadata{
-                                               Name:  "sw",
+                                       val, err := 
svcs.measure.Measure(&commonv1.Metadata{
+                                               Name:  "cpm",
                                                Group: "default",
                                        })
-                                       if !ok {
+                                       if err != nil {
                                                return false
                                        }
 
-                                       return 
len(val.schema.GetEntity().TagNames) == entitySize
+                                       return 
len(val.GetSchema().GetEntity().TagNames) == entitySize
                                }).WithTimeout(10 * 
time.Second).Should(BeTrue())
                        })
                })
diff --git a/banyand/measure/service.go b/banyand/measure/service.go
new file mode 100644
index 0000000..0181409
--- /dev/null
+++ b/banyand/measure/service.go
@@ -0,0 +1,156 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "context"
+       "time"
+
+       "github.com/pkg/errors"
+
+       "github.com/apache/skywalking-banyandb/api/data"
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       "github.com/apache/skywalking-banyandb/banyand/discovery"
+       "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+       resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
+)
+
+var (
+       ErrEmptyRootPath   = errors.New("root path is empty")
+       ErrMeasureNotExist = errors.New("measure doesn't exist")
+)
+
+type Service interface {
+       run.PreRunner
+       run.Config
+       run.Service
+       Query
+}
+
+var _ Service = (*service)(nil)
+
+type service struct {
+       schemaRepo    schemaRepo
+       writeListener *writeCallback
+       l             *logger.Logger
+       metadata      metadata.Repo
+       root          string
+       pipeline      queue.Queue
+       repo          discovery.ServiceRepo
+       // stop channel for the service
+       stopCh chan struct{}
+}
+
+func (s *service) Measure(metadata *commonv1.Metadata) (Measure, error) {
+       sm, ok := s.schemaRepo.loadMeasure(metadata)
+       if !ok {
+               return nil, errors.WithStack(ErrMeasureNotExist)
+       }
+       return sm, nil
+}
+func (s *service) LoadGroup(name string) (resourceSchema.Group, bool) {
+       return s.schemaRepo.LoadGroup(name)
+}
+
+func (s *service) FlagSet() *run.FlagSet {
+       flagS := run.NewFlagSet("storage")
+       flagS.StringVar(&s.root, "root-path", "/tmp", "the root path of 
database")
+       return flagS
+}
+
+func (s *service) Validate() error {
+       if s.root == "" {
+               return ErrEmptyRootPath
+       }
+       return nil
+}
+
+func (s *service) Name() string {
+       return "measure"
+}
+
+func (s *service) PreRun() error {
+       s.l = logger.GetLogger(s.Name())
+       ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+       groups, err := s.metadata.GroupRegistry().ListGroup(ctx)
+       cancel()
+       if err != nil {
+               return err
+       }
+       s.schemaRepo = newSchemaRepo(s.root, s.metadata, s.repo, s.l)
+       for _, g := range groups {
+               if g.Catalog != commonv1.Catalog_CATALOG_MEASURE {
+                       continue
+               }
+               gp, err := s.schemaRepo.StoreGroup(g.Metadata)
+               if err != nil {
+                       return err
+               }
+               ctx, cancel = context.WithTimeout(context.Background(), 
5*time.Second)
+               schemas, err := s.metadata.MeasureRegistry().ListMeasure(ctx, 
schema.ListOpt{Group: gp.GetSchema().GetMetadata().Name})
+               cancel()
+               if err != nil {
+                       return err
+               }
+               for _, sa := range schemas {
+                       if _, innerErr := gp.StoreResource(sa); innerErr != nil 
{
+                               return innerErr
+                       }
+               }
+       }
+
+       s.writeListener = setUpWriteCallback(s.l, &s.schemaRepo)
+
+       errWrite := s.pipeline.Subscribe(data.TopicMeasureWrite, 
s.writeListener)
+       if errWrite != nil {
+               return errWrite
+       }
+       return nil
+}
+
+func (s *service) Serve() run.StopNotify {
+       _ = s.schemaRepo.NotifyAll()
+       // run a serial watcher
+       go s.schemaRepo.Watcher()
+
+       
s.metadata.MeasureRegistry().RegisterHandler(schema.KindGroup|schema.KindMeasure|schema.KindIndexRuleBinding|schema.KindIndexRule,
+               &s.schemaRepo)
+
+       s.stopCh = make(chan struct{})
+       return s.stopCh
+}
+
+func (s *service) GracefulStop() {
+       s.schemaRepo.Close()
+       if s.stopCh != nil {
+               close(s.stopCh)
+       }
+}
+
+// NewService returns a new service
+func NewService(_ context.Context, metadata metadata.Repo, repo 
discovery.ServiceRepo, pipeline queue.Queue) (Service, error) {
+       return &service{
+               metadata: metadata,
+               repo:     repo,
+               pipeline: pipeline,
+       }, nil
+}
diff --git a/banyand/measure/testdata/query_data.json 
b/banyand/measure/testdata/query_data.json
new file mode 100644
index 0000000..4eb3e7e
--- /dev/null
+++ b/banyand/measure/testdata/query_data.json
@@ -0,0 +1,107 @@
+[
+       {
+               "tag_families": [
+                       {
+                               "tags": [
+                                       {
+                                               "str": {
+                                                       "value": "1"
+                                               }
+                                       },
+                                       {
+                                               "str": {
+                                                       "value": "minute"
+                                               }
+                                       }
+                               ]
+                       }
+               ],
+               "fields": [
+                       {
+                               "int": {
+                                       "value": 100
+                               }
+                       },
+                       {
+                               "int": {
+                                       "value": 100
+                               }
+                       },
+                       {
+                               "int": {
+                                       "value": 1
+                               }
+                       }
+               ]
+       },
+       {
+               "tag_families": [
+                       {
+                               "tags": [
+                                       {
+                                               "str": {
+                                                       "value": "1"
+                                               }
+                                       },
+                                       {
+                                               "str": {
+                                                       "value": "minute"
+                                               }
+                                       }
+                               ]
+                       }
+               ],
+               "fields": [
+                       {
+                               "int": {
+                                       "value": 200
+                               }
+                       },
+                       {
+                               "int": {
+                                       "value": 50
+                               }
+                       },
+                       {
+                               "int": {
+                                       "value": 4
+                               }
+                       }
+               ]
+       },
+       {
+               "tag_families": [
+                       {
+                               "tags": [
+                                       {
+                                               "str": {
+                                                       "value": "1"
+                                               }
+                                       },
+                                       {
+                                               "str": {
+                                                       "value": "minute"
+                                               }
+                                       }
+                               ]
+                       }
+               ],
+               "fields": [
+                       {
+                               "int": {
+                                       "value": 150
+                               }
+                       },
+                       {
+                               "int": {
+                                       "value": 300
+                               }
+                       },
+                       {
+                               "int": {
+                                       "value": 5
+                               }
+                       }
+               ]
+       }
+]
\ No newline at end of file
diff --git a/banyand/measure/testdata/write_data.json 
b/banyand/measure/testdata/write_data.json
new file mode 100644
index 0000000..fbfe02f
--- /dev/null
+++ b/banyand/measure/testdata/write_data.json
@@ -0,0 +1,107 @@
+[
+       {
+               "tag_families": [
+                       {
+                               "tags": [
+                                       {
+                                               "str": {
+                                                       "value": "1"
+                                               }
+                                       },
+                                       {
+                                               "str": {
+                                                       "value": "minute"
+                                               }
+                                       }
+                               ]
+                       }
+               ],
+               "fields": [
+                       {
+                               "int": {
+                                       "value": 100
+                               }
+                       },
+                       {
+                               "int": {
+                                       "value": 100
+                               }
+                       },
+                       {
+                               "int": {
+                                       "value": 1
+                               }
+                       }
+               ]
+       },
+       {
+               "tag_families": [
+                       {
+                               "tags": [
+                                       {
+                                               "str": {
+                                                       "value": "1"
+                                               }
+                                       },
+                                       {
+                                               "str": {
+                                                       "value": "minute"
+                                               }
+                                       }
+                               ]
+                       }
+               ],
+               "fields": [
+                       {
+                               "int": {
+                                       "value": 100
+                               }
+                       },
+                       {
+                               "int": {
+                                       "value": 100
+                               }
+                       },
+                       {
+                               "int": {
+                                       "value": 1
+                               }
+                       }
+               ]
+       },
+       {
+               "tag_families": [
+                       {
+                               "tags": [
+                                       {
+                                               "str": {
+                                                       "value": "1"
+                                               }
+                                       },
+                                       {
+                                               "str": {
+                                                       "value": "minute"
+                                               }
+                                       }
+                               ]
+                       }
+               ],
+               "fields": [
+                       {
+                               "int": {
+                                       "value": 100
+                               }
+                       },
+                       {
+                               "int": {
+                                       "value": 100
+                               }
+                       },
+                       {
+                               "int": {
+                                       "value": 1
+                               }
+                       }
+               ]
+       }
+]
\ No newline at end of file
diff --git a/banyand/stream/metadata_test.go b/banyand/stream/metadata_test.go
index 216bbd4..a980b2c 100644
--- a/banyand/stream/metadata_test.go
+++ b/banyand/stream/metadata_test.go
@@ -27,6 +27,7 @@ import (
        "github.com/apache/skywalking-banyandb/api/event"
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/pkg/test"
 )
 
 var _ = Describe("Metadata", func() {
@@ -49,7 +50,7 @@ var _ = Describe("Metadata", func() {
                        }).WithTimeout(10 * time.Second).Should(BeTrue())
                })
                It("should close the group", func() {
-                       svcs.repo.EXPECT().Publish(event.StreamTopicShardEvent, 
&shardEventMatcher{action: databasev1.Action_ACTION_DELETE}).Times(2)
+                       svcs.repo.EXPECT().Publish(event.StreamTopicShardEvent, 
test.NewShardEventMatcher(databasev1.Action_ACTION_DELETE)).Times(2)
                        deleted, err := 
svcs.metadataService.GroupRegistry().DeleteGroup(context.TODO(), "default")
                        Expect(err).ShouldNot(HaveOccurred())
                        Expect(deleted).Should(BeTrue())
@@ -60,8 +61,8 @@ var _ = Describe("Metadata", func() {
                })
 
                It("should add shards", func() {
-                       svcs.repo.EXPECT().Publish(event.StreamTopicShardEvent, 
&shardEventMatcher{action: databasev1.Action_ACTION_DELETE}).Times(2)
-                       svcs.repo.EXPECT().Publish(event.StreamTopicShardEvent, 
&shardEventMatcher{action: databasev1.Action_ACTION_PUT}).Times(4)
+                       svcs.repo.EXPECT().Publish(event.StreamTopicShardEvent, 
test.NewShardEventMatcher(databasev1.Action_ACTION_DELETE)).Times(2)
+                       svcs.repo.EXPECT().Publish(event.StreamTopicShardEvent, 
test.NewShardEventMatcher(databasev1.Action_ACTION_PUT)).Times(4)
                        groupSchema, err := 
svcs.metadataService.GroupRegistry().GetGroup(context.TODO(), "default")
                        Expect(err).ShouldNot(HaveOccurred())
                        Expect(groupSchema).ShouldNot(BeNil())
@@ -90,7 +91,7 @@ var _ = Describe("Metadata", func() {
                        }).WithTimeout(10 * time.Second).Should(BeTrue())
                })
                It("should close the stream", func() {
-                       
svcs.repo.EXPECT().Publish(event.StreamTopicEntityEvent, 
&entityEventMatcher{action: databasev1.Action_ACTION_DELETE}).Times(1)
+                       
svcs.repo.EXPECT().Publish(event.StreamTopicEntityEvent, 
test.NewEntityEventMatcher(databasev1.Action_ACTION_DELETE)).Times(1)
                        deleted, err := 
svcs.metadataService.StreamRegistry().DeleteStream(context.TODO(), 
&commonv1.Metadata{
                                Name:  "sw",
                                Group: "default",
@@ -121,7 +122,7 @@ var _ = Describe("Metadata", func() {
                        })
 
                        It("should update a new stream", func() {
-                               
svcs.repo.EXPECT().Publish(event.StreamTopicEntityEvent, 
&entityEventMatcher{action: databasev1.Action_ACTION_PUT}).Times(1)
+                               
svcs.repo.EXPECT().Publish(event.StreamTopicEntityEvent, 
test.NewEntityEventMatcher(databasev1.Action_ACTION_PUT)).Times(1)
                                // Remove the first tag from the entity
                                streamSchema.Entity.TagNames = 
streamSchema.Entity.TagNames[1:]
                                entitySize := len(streamSchema.Entity.TagNames)
diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go
index 2856938..83854ac 100644
--- a/banyand/stream/stream.go
+++ b/banyand/stream/stream.go
@@ -26,6 +26,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/tsdb/index"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/partition"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/schema"
 )
 
@@ -69,20 +70,10 @@ func (s *stream) Close() error {
        return s.indexWriter.Close()
 }
 
-func parseMaxModRevision(indexRules []*databasev1.IndexRule) 
(maxRevisionForIdxRules int64) {
-       maxRevisionForIdxRules = int64(0)
-       for _, idxRule := range indexRules {
-               if idxRule.GetMetadata().GetModRevision() > 
maxRevisionForIdxRules {
-                       maxRevisionForIdxRules = 
idxRule.GetMetadata().GetModRevision()
-               }
-       }
-       return
-}
-
 func (s *stream) parseSpec() {
        s.name, s.group = s.schema.GetMetadata().GetName(), 
s.schema.GetMetadata().GetGroup()
        s.entityLocator = partition.NewEntityLocator(s.schema.GetTagFamilies(), 
s.schema.GetEntity())
-       s.maxObservedModRevision = parseMaxModRevision(s.indexRules)
+       s.maxObservedModRevision = pbv1.ParseMaxModRevision(s.indexRules)
 }
 
 type streamSpec struct {
diff --git a/banyand/stream/stream_query.go b/banyand/stream/stream_query.go
index 2896f0d..493966f 100644
--- a/banyand/stream/stream_query.go
+++ b/banyand/stream/stream_query.go
@@ -55,7 +55,7 @@ func (s *stream) Shards(entity tsdb.Entity) ([]tsdb.Shard, 
error) {
        wrap := func(shards []tsdb.Shard) []tsdb.Shard {
                result := make([]tsdb.Shard, len(shards))
                for i := 0; i < len(shards); i++ {
-                       result[i] = newShardDelegate(tsdb.Entry(s.name), 
shards[i])
+                       result[i] = tsdb.NewScopedShard(tsdb.Entry(s.name), 
shards[i])
                }
                return result
        }
@@ -76,7 +76,7 @@ func (s *stream) Shards(entity tsdb.Entity) ([]tsdb.Shard, 
error) {
        if err != nil {
                return nil, err
        }
-       return []tsdb.Shard{newShardDelegate(tsdb.Entry(s.name), shard)}, nil
+       return []tsdb.Shard{tsdb.NewScopedShard(tsdb.Entry(s.name), shard)}, nil
 }
 
 func (s *stream) Shard(id common.ShardID) (tsdb.Shard, error) {
@@ -84,7 +84,7 @@ func (s *stream) Shard(id common.ShardID) (tsdb.Shard, error) 
{
        if err != nil {
                return nil, err
        }
-       return newShardDelegate(tsdb.Entry(s.name), shard), nil
+       return tsdb.NewScopedShard(tsdb.Entry(s.name), shard), nil
 }
 
 func (s *stream) ParseTagFamily(family string, item tsdb.Item) 
(*modelv1.TagFamily, error) {
@@ -128,68 +128,3 @@ func (s *stream) ParseElementID(item tsdb.Item) (string, 
error) {
        }
        return string(rawBytes), nil
 }
-
-var _ tsdb.Shard = (*shardDelegate)(nil)
-
-type shardDelegate struct {
-       scope     tsdb.Entry
-       delegated tsdb.Shard
-}
-
-func newShardDelegate(scope tsdb.Entry, delegated tsdb.Shard) tsdb.Shard {
-       return &shardDelegate{
-               scope:     scope,
-               delegated: delegated,
-       }
-}
-
-func (sd *shardDelegate) Close() error {
-       // the delegate can't close the underlying shard
-       return nil
-}
-
-func (sd *shardDelegate) ID() common.ShardID {
-       return sd.delegated.ID()
-}
-
-func (sd *shardDelegate) Series() tsdb.SeriesDatabase {
-       return &seriesDatabaseDelegate{
-               scope:     sd.scope,
-               delegated: sd.delegated.Series(),
-       }
-}
-
-func (sd *shardDelegate) Index() tsdb.IndexDatabase {
-       return sd.delegated.Index()
-}
-
-func (sd *shardDelegate) State() tsdb.ShardState {
-       return sd.delegated.State()
-}
-
-var _ tsdb.SeriesDatabase = (*seriesDatabaseDelegate)(nil)
-
-type seriesDatabaseDelegate struct {
-       scope     tsdb.Entry
-       delegated tsdb.SeriesDatabase
-}
-
-func (sdd *seriesDatabaseDelegate) Close() error {
-       return nil
-}
-
-func (sdd *seriesDatabaseDelegate) GetByHashKey(key []byte) (tsdb.Series, 
error) {
-       return sdd.delegated.GetByHashKey(key)
-}
-
-func (sdd *seriesDatabaseDelegate) GetByID(id common.SeriesID) (tsdb.Series, 
error) {
-       return sdd.delegated.GetByID(id)
-}
-
-func (sdd *seriesDatabaseDelegate) Get(entity tsdb.Entity) (tsdb.Series, 
error) {
-       return sdd.delegated.Get(entity.Prepend(sdd.scope))
-}
-
-func (sdd *seriesDatabaseDelegate) List(path tsdb.Path) (tsdb.SeriesList, 
error) {
-       return sdd.delegated.List(path.Prepand(sdd.scope))
-}
diff --git a/banyand/stream/stream_query_test.go 
b/banyand/stream/stream_query_test.go
index cca1637..cae08e6 100644
--- a/banyand/stream/stream_query_test.go
+++ b/banyand/stream/stream_query_test.go
@@ -54,26 +54,27 @@ type shardStruct struct {
 type shardsForTest []shardStruct
 
 var _ = Describe("Write", func() {
-       var (
-               s       *stream
-               deferFn func()
-       )
 
-       BeforeEach(func() {
-               var svcs *services
-               svcs, deferFn = setUp()
-               var ok bool
-               s, ok = svcs.stream.schemaRepo.loadStream(&commonv1.Metadata{
-                       Name:  "sw",
-                       Group: "default",
+       Context("Select shard", func() {
+               var (
+                       s       *stream
+                       deferFn func()
+               )
+
+               BeforeEach(func() {
+                       var svcs *services
+                       svcs, deferFn = setUp()
+                       var ok bool
+                       s, ok = 
svcs.stream.schemaRepo.loadStream(&commonv1.Metadata{
+                               Name:  "sw",
+                               Group: "default",
+                       })
+                       Expect(ok).To(BeTrue())
                })
-               Expect(ok).To(BeTrue())
-       })
 
-       AfterEach(func() {
-               deferFn()
-       })
-       Context("Select shard", func() {
+               AfterEach(func() {
+                       deferFn()
+               })
                tests := []struct {
                        name         string
                        entity       tsdb.Entity
@@ -107,11 +108,26 @@ var _ = Describe("Write", func() {
                        })
                }
        })
-       Context("Querying by local indices", func() {
-               var now time.Time
-               BeforeEach(func() {
+       Context("Querying by local indices", Ordered, func() {
+               var (
+                       s       *stream
+                       now     time.Time
+                       deferFn func()
+               )
+               BeforeAll(func() {
+                       var svcs *services
+                       svcs, deferFn = setUp()
+                       var ok bool
+                       s, ok = 
svcs.stream.schemaRepo.loadStream(&commonv1.Metadata{
+                               Name:  "sw",
+                               Group: "default",
+                       })
+                       Expect(ok).To(BeTrue())
                        now = setupQueryData("multiple_shards.json", s)
                })
+               AfterAll(func() {
+                       deferFn()
+               })
                When("", func() {
                        l1 := []string{fmt.Sprintf("series_%d", 
tsdb.SeriesID(tsdb.Entity{
                                tsdb.Entry("sw"),
@@ -581,10 +597,25 @@ var _ = Describe("Write", func() {
                })
        })
 
-       Context("Querying by global indices", func() {
-               BeforeEach(func() {
+       Context("Querying by global indices", Ordered, func() {
+               var (
+                       s       *stream
+                       deferFn func()
+               )
+               BeforeAll(func() {
+                       var svcs *services
+                       svcs, deferFn = setUp()
+                       var ok bool
+                       s, ok = 
svcs.stream.schemaRepo.loadStream(&commonv1.Metadata{
+                               Name:  "sw",
+                               Group: "default",
+                       })
+                       Expect(ok).To(BeTrue())
                        _ = setupQueryData("global_index.json", s)
                })
+               AfterAll(func() {
+                       deferFn()
+               })
                DescribeTable("", func(traceID string, wantTraceSegmentNum int, 
wantErr bool) {
                        shards, errShards := s.Shards(nil)
                        Expect(errShards).ShouldNot(HaveOccurred())
diff --git a/banyand/stream/stream_suite_test.go 
b/banyand/stream/stream_suite_test.go
index 53b7618..b3d3f60 100644
--- a/banyand/stream/stream_suite_test.go
+++ b/banyand/stream/stream_suite_test.go
@@ -19,7 +19,6 @@ package stream
 
 import (
        "context"
-       "fmt"
        "testing"
 
        "github.com/golang/mock/gomock"
@@ -31,7 +30,6 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/discovery"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/queue"
-       "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/test"
        teststream "github.com/apache/skywalking-banyandb/pkg/test/stream"
@@ -50,47 +48,6 @@ var _ = BeforeSuite(func() {
        })).To(Succeed())
 })
 
-var (
-       _ gomock.Matcher = (*shardEventMatcher)(nil)
-       _ gomock.Matcher = (*entityEventMatcher)(nil)
-)
-
-type shardEventMatcher struct {
-       action databasev1.Action
-}
-
-func (s *shardEventMatcher) Matches(x interface{}) bool {
-       if m, messageOk := x.(bus.Message); messageOk {
-               if evt, dataOk := m.Data().(*databasev1.ShardEvent); dataOk {
-                       return evt.Action == s.action
-               }
-       }
-
-       return false
-}
-
-func (s *shardEventMatcher) String() string {
-       return fmt.Sprintf("shard-event-matcher(%s)", 
databasev1.Action_name[int32(s.action)])
-}
-
-type entityEventMatcher struct {
-       action databasev1.Action
-}
-
-func (s *entityEventMatcher) Matches(x interface{}) bool {
-       if m, messageOk := x.(bus.Message); messageOk {
-               if evt, dataOk := m.Data().(*databasev1.EntityEvent); dataOk {
-                       return evt.Action == s.action
-               }
-       }
-
-       return false
-}
-
-func (s *entityEventMatcher) String() string {
-       return fmt.Sprintf("entity-event-matcher(%s)", 
databasev1.Action_name[int32(s.action)])
-}
-
 // service to preload stream
 type preloadStreamService struct {
        metaSvc metadata.Service
@@ -117,8 +74,8 @@ func setUp() (*services, func()) {
        repo := discovery.NewMockServiceRepo(ctrl)
        repo.EXPECT().NodeID().AnyTimes()
        // Both PreRun and Serve phases send events
-       repo.EXPECT().Publish(event.StreamTopicEntityEvent, 
&entityEventMatcher{action: databasev1.Action_ACTION_PUT}).Times(2 * 1)
-       repo.EXPECT().Publish(event.StreamTopicShardEvent, 
&shardEventMatcher{action: databasev1.Action_ACTION_PUT}).Times(2 * 2)
+       repo.EXPECT().Publish(event.StreamTopicEntityEvent, 
test.NewEntityEventMatcher(databasev1.Action_ACTION_PUT)).Times(2 * 1)
+       repo.EXPECT().Publish(event.StreamTopicShardEvent, 
test.NewShardEventMatcher(databasev1.Action_ACTION_PUT)).Times(2 * 2)
 
        // Init Pipeline
        pipeline, err := queue.NewQueue(context.TODO(), repo)
diff --git a/banyand/tsdb/scope.go b/banyand/tsdb/scope.go
new file mode 100644
index 0000000..cbbe687
--- /dev/null
+++ b/banyand/tsdb/scope.go
@@ -0,0 +1,85 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package tsdb
+
+import "github.com/apache/skywalking-banyandb/api/common"
+
+var _ Shard = (*ScopedShard)(nil)
+
+type ScopedShard struct {
+       scope     Entry
+       delegated Shard
+}
+
+func NewScopedShard(scope Entry, delegated Shard) Shard {
+       return &ScopedShard{
+               scope:     scope,
+               delegated: delegated,
+       }
+}
+
+func (sd *ScopedShard) Close() error {
+       // the delegate can't close the underlying shard
+       return nil
+}
+
+func (sd *ScopedShard) ID() common.ShardID {
+       return sd.delegated.ID()
+}
+
+func (sd *ScopedShard) Series() SeriesDatabase {
+       return &scopedSeriesDatabase{
+               scope:     sd.scope,
+               delegated: sd.delegated.Series(),
+       }
+}
+
+func (sd *ScopedShard) Index() IndexDatabase {
+       return sd.delegated.Index()
+}
+
+func (sd *ScopedShard) State() ShardState {
+       return sd.delegated.State()
+}
+
+var _ SeriesDatabase = (*scopedSeriesDatabase)(nil)
+
+type scopedSeriesDatabase struct {
+       scope     Entry
+       delegated SeriesDatabase
+}
+
+func (sdd *scopedSeriesDatabase) Close() error {
+       return nil
+}
+
+func (sdd *scopedSeriesDatabase) GetByHashKey(key []byte) (Series, error) {
+       return sdd.delegated.GetByHashKey(key)
+}
+
+func (sdd *scopedSeriesDatabase) GetByID(id common.SeriesID) (Series, error) {
+       return sdd.delegated.GetByID(id)
+}
+
+func (sdd *scopedSeriesDatabase) Get(entity Entity) (Series, error) {
+       return sdd.delegated.Get(entity.Prepend(sdd.scope))
+}
+
+func (sdd *scopedSeriesDatabase) List(path Path) (SeriesList, error) {
+       return sdd.delegated.List(path.Prepand(sdd.scope))
+}
diff --git a/pkg/test/matcher.go b/pkg/test/matcher.go
new file mode 100644
index 0000000..f218bf0
--- /dev/null
+++ b/pkg/test/matcher.go
@@ -0,0 +1,80 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package test
+
+import (
+       "fmt"
+
+       "github.com/golang/mock/gomock"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/pkg/bus"
+)
+
+var (
+       _ gomock.Matcher = (*shardEventMatcher)(nil)
+       _ gomock.Matcher = (*entityEventMatcher)(nil)
+)
+
+type shardEventMatcher struct {
+       action databasev1.Action
+}
+
+func NewShardEventMatcher(action databasev1.Action) gomock.Matcher {
+       return &shardEventMatcher{
+               action: action,
+       }
+}
+
+func (s *shardEventMatcher) Matches(x interface{}) bool {
+       if m, messageOk := x.(bus.Message); messageOk {
+               if evt, dataOk := m.Data().(*databasev1.ShardEvent); dataOk {
+                       return evt.Action == s.action
+               }
+       }
+
+       return false
+}
+
+func (s *shardEventMatcher) String() string {
+       return fmt.Sprintf("shard-event-matcher(%s)", 
databasev1.Action_name[int32(s.action)])
+}
+
+type entityEventMatcher struct {
+       action databasev1.Action
+}
+
+func NewEntityEventMatcher(action databasev1.Action) gomock.Matcher {
+       return &entityEventMatcher{
+               action: action,
+       }
+}
+
+func (s *entityEventMatcher) Matches(x interface{}) bool {
+       if m, messageOk := x.(bus.Message); messageOk {
+               if evt, dataOk := m.Data().(*databasev1.EntityEvent); dataOk {
+                       return evt.Action == s.action
+               }
+       }
+
+       return false
+}
+
+func (s *entityEventMatcher) String() string {
+       return fmt.Sprintf("entity-event-matcher(%s)", 
databasev1.Action_name[int32(s.action)])
+}
diff --git a/pkg/test/measure/etcd.go b/pkg/test/measure/etcd.go
index c08968e..9f20108 100644
--- a/pkg/test/measure/etcd.go
+++ b/pkg/test/measure/etcd.go
@@ -21,13 +21,13 @@ import (
        "context"
        "embed"
        "fmt"
-       "math/rand"
        "os"
        "path"
 
        "github.com/google/uuid"
        "google.golang.org/protobuf/encoding/protojson"
 
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
 )
@@ -41,9 +41,18 @@ var (
        indexRuleBindingJSON string
        //go:embed testdata/measure.json
        measureJSON string
+       //go:embed testdata/group.json
+       groupJSON string
 )
 
 func PreloadSchema(e schema.Registry) error {
+       g := &commonv1.Group{}
+       if err := protojson.Unmarshal([]byte(groupJSON), g); err != nil {
+               return err
+       }
+       if err := e.UpdateGroup(context.TODO(), g); err != nil {
+               return err
+       }
        s := &databasev1.Measure{}
        if err := protojson.Unmarshal([]byte(measureJSON), s); err != nil {
                return err
@@ -88,9 +97,3 @@ func PreloadSchema(e schema.Registry) error {
 func RandomTempDir() string {
        return path.Join(os.TempDir(), fmt.Sprintf("banyandb-embed-etcd-%s", 
uuid.New().String()))
 }
-
-func RandomUnixDomainListener() (string, string) {
-       i := rand.Uint64()
-       return fmt.Sprintf("%s://localhost:%d%06d", "unix", os.Getpid(), i),
-               fmt.Sprintf("%s://localhost:%d%06d", "unix", os.Getpid(), i+1)
-}
diff --git a/pkg/test/measure/testdata/group.json 
b/pkg/test/measure/testdata/group.json
new file mode 100644
index 0000000..c2b6c02
--- /dev/null
+++ b/pkg/test/measure/testdata/group.json
@@ -0,0 +1,27 @@
+{
+  "metadata": {
+    "name": "default"
+  },
+  "catalog": "CATALOG_MEASURE",
+  "resource_opts": {
+    "shard_num": 2,
+    "interval_rules": [
+      {
+        "tag_name": "scope",
+        "str": "minute",
+        "interval": "1m"
+      },
+      {
+        "tag_name": "scope",
+        "str": "hour",
+        "interval": "1h"
+      },
+      {
+        "tag_name": "scope",
+        "str": "day",
+        "interval": "1d"
+      }
+    ]
+  },
+  "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/pkg/test/measure/testdata/measure.json 
b/pkg/test/measure/testdata/measure.json
index a0ab240..47e341a 100644
--- a/pkg/test/measure/testdata/measure.json
+++ b/pkg/test/measure/testdata/measure.json
@@ -43,29 +43,5 @@
       "entity_id"
     ]
   },
-  "interval_rules": [
-    {
-      "tag_name": "scope",
-      "str": "minute",
-      "interval": "1m"
-    },
-    {
-      "tag_name": "scope",
-      "str": "hour",
-      "interval": "1h"
-    },
-    {
-      "tag_name": "scope",
-      "str": "day",
-      "interval": "1d"
-    }
-  ],
-  "opts": {
-    "shard_num": 2,
-    "ttl": {
-      "val": 7,
-      "unit": "DURATION_UNIT_DAY"
-    }
-  },
   "updated_at": "2021-04-15T01:30:15.01Z"
 }
\ No newline at end of file

Reply via email to