This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch storage-column
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit a04a05f9daa2bcc4d7be0bb108b7353792b573a5
Author: Gao Hongtao <[email protected]>
AuthorDate: Sun Dec 10 22:04:56 2023 +0800

    Support measure query
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 banyand/internal/storage/index.go                  |  46 ++-
 banyand/internal/storage/index_test.go             |   7 +-
 banyand/internal/storage/storage.go                |   9 +-
 banyand/internal/storage/tsdb.go                   |  14 +-
 banyand/measure/block.go                           | 206 +++++++++--
 banyand/measure/block_metadata.go                  | 134 ++++++-
 .../{field_flag_test.go => block_reader.go}        |  38 +-
 banyand/measure/block_writer.go                    |  16 +-
 banyand/measure/column.go                          |  78 ++--
 banyand/measure/column_metadata.go                 |  40 ++-
 banyand/measure/datapoints.go                      |   5 +-
 banyand/measure/measure_query.go                   | 394 ++++++++++++++++++++-
 banyand/measure/measure_topn.go                    |  73 ++--
 banyand/measure/measure_write.go                   |  27 +-
 banyand/measure/part.go                            |  44 ++-
 banyand/measure/part_iter.go                       | 258 ++++++++++++++
 banyand/measure/primary_metadata.go                |  71 +++-
 banyand/measure/tstable.go                         | 159 ++++++++-
 pkg/bytes/buffer.go                                |  10 +-
 pkg/bytes/{buffer.go => resize.go}                 |  57 +--
 pkg/fs/file_system.go                              |  23 +-
 pkg/pb/v1/metadata.go                              |  52 +++
 {banyand/internal/storage => pkg/pb/v1}/series.go  |  63 +---
 .../internal/storage => pkg/pb/v1}/series_test.go  |   6 +-
 {banyand/internal/storage => pkg/pb/v1}/value.go   |  92 ++++-
 .../internal/storage => pkg/pb/v1}/value_test.go   |   2 +-
 pkg/query/executor/interface.go                    |   4 +-
 pkg/query/logical/common.go                        |   6 +-
 pkg/query/logical/expr.go                          |   2 +-
 pkg/query/logical/index_filter.go                  | 294 ++++++++++++++-
 .../measure/measure_plan_indexscan_local.go        | 253 +++++--------
 pkg/query/logical/measure/topn_plan_localscan.go   | 145 +++-----
 pkg/query/logical/schema.go                        |   4 +-
 pkg/query/logical/stream/stream_plan_tag_filter.go |   2 +-
 34 files changed, 2051 insertions(+), 583 deletions(-)

diff --git a/banyand/internal/storage/index.go 
b/banyand/internal/storage/index.go
index 5081db54..780a2fea 100644
--- a/banyand/internal/storage/index.go
+++ b/banyand/internal/storage/index.go
@@ -26,27 +26,21 @@ import (
 
        "github.com/apache/skywalking-banyandb/api/common"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
-       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/pkg/index"
        "github.com/apache/skywalking-banyandb/pkg/index/inverted"
        "github.com/apache/skywalking-banyandb/pkg/index/posting"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
 )
 
 func (d *database[T]) IndexDB() IndexDB {
        return d.index
 }
 
-func (d *database[T]) Lookup(ctx context.Context, series *Series) (SeriesList, 
error) {
+func (d *database[T]) Lookup(ctx context.Context, series *pbv1.Series) 
(pbv1.SeriesList, error) {
        return d.index.searchPrimary(ctx, series)
 }
 
-// OrderBy specifies the order of the result.
-type OrderBy struct {
-       Index *databasev1.IndexRule
-       Sort  modelv1.Sort
-}
-
 type seriesIndex struct {
        store    index.SeriesStore
        l        *logger.Logger
@@ -69,8 +63,8 @@ func newSeriesIndex(ctx context.Context, root string) 
(*seriesIndex, error) {
 
 var entityKey = index.FieldKey{}
 
-func (s *seriesIndex) createPrimary(series *Series) (*Series, error) {
-       if err := series.marshal(); err != nil {
+func (s *seriesIndex) createPrimary(series *pbv1.Series) (*pbv1.Series, error) 
{
+       if err := series.Marshal(); err != nil {
                return nil, err
        }
        id, err := s.store.Search(series.Buffer)
@@ -97,7 +91,7 @@ func (s *seriesIndex) Write(docs index.Documents) error {
 
 var rangeOpts = index.RangeOpts{}
 
-func (s *seriesIndex) searchPrimary(ctx context.Context, series *Series) 
(SeriesList, error) {
+func (s *seriesIndex) searchPrimary(ctx context.Context, series *pbv1.Series) 
(pbv1.SeriesList, error) {
        var hasAny, hasWildcard bool
        var prefixIndex int
 
@@ -105,7 +99,7 @@ func (s *seriesIndex) searchPrimary(ctx context.Context, 
series *Series) (Series
                if tv == nil {
                        return nil, errors.New("nil tag value")
                }
-               if tv.Value == AnyEntry {
+               if tv == pbv1.AnyTagValue {
                        if !hasAny {
                                hasAny = true
                                prefixIndex = i
@@ -123,7 +117,7 @@ func (s *seriesIndex) searchPrimary(ctx context.Context, 
series *Series) (Series
        if hasAny {
                var ss []index.Series
                if hasWildcard {
-                       if err = series.marshal(); err != nil {
+                       if err = series.Marshal(); err != nil {
                                return nil, err
                        }
                        ss, err = s.store.SearchWildcard(series.Buffer)
@@ -133,7 +127,7 @@ func (s *seriesIndex) searchPrimary(ctx context.Context, 
series *Series) (Series
                        return convertIndexSeriesToSeriesList(ss)
                }
                series.EntityValues = series.EntityValues[:prefixIndex]
-               if err = series.marshal(); err != nil {
+               if err = series.Marshal(); err != nil {
                        return nil, err
                }
                ss, err = s.store.SearchPrefix(series.Buffer)
@@ -142,7 +136,7 @@ func (s *seriesIndex) searchPrimary(ctx context.Context, 
series *Series) (Series
                }
                return convertIndexSeriesToSeriesList(ss)
        }
-       if err = series.marshal(); err != nil {
+       if err = series.Marshal(); err != nil {
                return nil, err
        }
        var seriesID common.SeriesID
@@ -152,17 +146,17 @@ func (s *seriesIndex) searchPrimary(ctx context.Context, 
series *Series) (Series
        }
        if seriesID > 0 {
                series.ID = seriesID
-               return SeriesList{series}, nil
+               return pbv1.SeriesList{series}, nil
        }
        return nil, nil
 }
 
-func convertIndexSeriesToSeriesList(indexSeries []index.Series) (SeriesList, 
error) {
-       seriesList := make(SeriesList, 0, len(indexSeries))
+func convertIndexSeriesToSeriesList(indexSeries []index.Series) 
(pbv1.SeriesList, error) {
+       seriesList := make(pbv1.SeriesList, 0, len(indexSeries))
        for _, s := range indexSeries {
-               var series Series
+               var series pbv1.Series
                series.ID = s.ID
-               if err := series.unmarshal(s.EntityValues); err != nil {
+               if err := series.Unmarshal(s.EntityValues); err != nil {
                        return nil, err
                }
                seriesList = append(seriesList, &series)
@@ -170,13 +164,13 @@ func convertIndexSeriesToSeriesList(indexSeries 
[]index.Series) (SeriesList, err
        return seriesList, nil
 }
 
-func (s *seriesIndex) Search(ctx context.Context, series *Series, filter 
index.Filter, order *OrderBy) (SeriesList, error) {
+func (s *seriesIndex) Search(ctx context.Context, series *pbv1.Series, filter 
index.Filter, order *pbv1.OrderBy) (pbv1.SeriesList, error) {
        seriesList, err := s.searchPrimary(ctx, series)
        if err != nil {
                return nil, err
        }
 
-       pl := seriesList.toList()
+       pl := seriesList.ToList()
        if filter != nil {
                var plFilter posting.List
                plFilter, err = filter.Execute(func(ruleType 
databasev1.IndexRule_Type) (index.Searcher, error) {
@@ -190,7 +184,7 @@ func (s *seriesIndex) Search(ctx context.Context, series 
*Series, filter index.F
                }
        }
 
-       if order == nil {
+       if order == nil && order.Index == nil {
                return filterSeriesList(seriesList, pl), nil
        }
 
@@ -205,7 +199,7 @@ func (s *seriesIndex) Search(ctx context.Context, series 
*Series, filter index.F
                err = multierr.Append(err, iter.Close())
        }()
 
-       var sortedSeriesList SeriesList
+       var sortedSeriesList pbv1.SeriesList
        for iter.Next() {
                pv := iter.Val().Value
                if err = pv.Intersect(pl); err != nil {
@@ -222,7 +216,7 @@ func (s *seriesIndex) Search(ctx context.Context, series 
*Series, filter index.F
        return sortedSeriesList, err
 }
 
-func filterSeriesList(seriesList SeriesList, filter posting.List) SeriesList {
+func filterSeriesList(seriesList pbv1.SeriesList, filter posting.List) 
pbv1.SeriesList {
        for i := 0; i < len(seriesList); i++ {
                if !filter.Contains(uint64(seriesList[i].ID)) {
                        seriesList = append(seriesList[:i], seriesList[i+1:]...)
@@ -232,7 +226,7 @@ func filterSeriesList(seriesList SeriesList, filter 
posting.List) SeriesList {
        return seriesList
 }
 
-func appendSeriesList(dest, src SeriesList, filter posting.List) SeriesList {
+func appendSeriesList(dest, src pbv1.SeriesList, filter posting.List) 
pbv1.SeriesList {
        for i := 0; i < len(src); i++ {
                if !filter.Contains(uint64(src[i].ID)) {
                        continue
diff --git a/banyand/internal/storage/index_test.go 
b/banyand/internal/storage/index_test.go
index 5f5fc72a..97280850 100644
--- a/banyand/internal/storage/index_test.go
+++ b/banyand/internal/storage/index_test.go
@@ -27,11 +27,12 @@ import (
 
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/test"
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
 )
 
-var testSeriesPool SeriesPool
+var testSeriesPool pbv1.SeriesPool
 
 func TestSeriesIndex_Primary(t *testing.T) {
        ctx := context.Background()
@@ -86,7 +87,7 @@ func TestSeriesIndex_Primary(t *testing.T) {
                        subject: "service_instance_latency",
                        entityValues: []*modelv1.TagValue{
                                {Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: "svc_1"}}},
-                               {Value: AnyEntry},
+                               pbv1.AnyTagValue,
                        },
                        expected: []*modelv1.TagValue{
                                {Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: "svc_1"}}},
@@ -97,7 +98,7 @@ func TestSeriesIndex_Primary(t *testing.T) {
                        name:    "Wildcard",
                        subject: "service_instance_latency",
                        entityValues: []*modelv1.TagValue{
-                               {Value: AnyEntry},
+                               pbv1.AnyTagValue,
                                {Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: "svc_1_instance_1"}}},
                        },
                        expected: []*modelv1.TagValue{
diff --git a/banyand/internal/storage/storage.go 
b/banyand/internal/storage/storage.go
index 0bb912b0..d2dd5d72 100644
--- a/banyand/internal/storage/storage.go
+++ b/banyand/internal/storage/storage.go
@@ -35,6 +35,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/index"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
@@ -64,16 +65,16 @@ type SupplyTSDB[T TSTable] func() T
 
 type IndexDB interface {
        Write(docs index.Documents) error
-       Search(ctx context.Context, series *Series, filter index.Filter, order 
*OrderBy) (SeriesList, error)
+       Search(ctx context.Context, series *pbv1.Series, filter index.Filter, 
order *pbv1.OrderBy) (pbv1.SeriesList, error)
 }
 
 // TSDB allows listing and getting shard details.
 type TSDB[T TSTable] interface {
        io.Closer
-       Register(shardID common.ShardID, series *Series) (*Series, error)
-       Lookup(ctx context.Context, series *Series) (SeriesList, error)
+       Register(shardID common.ShardID, series *pbv1.Series) (*pbv1.Series, 
error)
+       Lookup(ctx context.Context, series *pbv1.Series) (pbv1.SeriesList, 
error)
        CreateTSTableIfNotExist(shardID common.ShardID, ts time.Time) 
(TSTableWrapper[T], error)
-       SelectTSTables(shardID common.ShardID, timeRange timestamp.TimeRange) 
([]TSTableWrapper[T], error)
+       SelectTSTables(timeRange timestamp.TimeRange) []TSTableWrapper[T]
        IndexDB() IndexDB
 }
 
diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go
index 8c5817fc..d6fdfcad 100644
--- a/banyand/internal/storage/tsdb.go
+++ b/banyand/internal/storage/tsdb.go
@@ -36,6 +36,7 @@ import (
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
@@ -114,7 +115,7 @@ func OpenTSDB[T TSTable](ctx context.Context, opts 
TSDBOpts[T]) (TSDB[T], error)
        return db, nil
 }
 
-func (d *database[T]) Register(shardID common.ShardID, series *Series) 
(*Series, error) {
+func (d *database[T]) Register(shardID common.ShardID, series *pbv1.Series) 
(*pbv1.Series, error) {
        var err error
        if series, err = d.index.createPrimary(series); err != nil {
                return nil, err
@@ -144,11 +145,14 @@ func (d *database[T]) CreateTSTableIfNotExist(shardID 
common.ShardID, ts time.Ti
        return d.sLst[shardID].segmentController.createTSTable(timeRange.Start)
 }
 
-func (d *database[T]) SelectTSTables(shardID common.ShardID, timeRange 
timestamp.TimeRange) ([]TSTableWrapper[T], error) {
-       if int(shardID) >= int(atomic.LoadUint32(&d.sLen)) {
-               return nil, ErrUnknownShard
+func (d *database[T]) SelectTSTables(timeRange timestamp.TimeRange) 
[]TSTableWrapper[T] {
+       var result []TSTableWrapper[T]
+       d.RLock()
+       for i := range d.sLst {
+               result = append(result, 
d.sLst[i].segmentController.selectTSTables(timeRange)...)
        }
-       return d.sLst[shardID].segmentController.selectTSTables(timeRange), nil
+       d.RUnlock()
+       return result
 }
 
 func (d *database[T]) registerShard(id int) error {
diff --git a/banyand/measure/block.go b/banyand/measure/block.go
index dabaf9d5..0f512113 100644
--- a/banyand/measure/block.go
+++ b/banyand/measure/block.go
@@ -21,16 +21,18 @@ import (
        "sync"
 
        "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/pkg/bytes"
        "github.com/apache/skywalking-banyandb/pkg/encoding"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
 type block struct {
        timestamps []int64
 
-       tagFamilies []columnFamily
+       tagFamilies []ColumnFamily
 
-       field columnFamily
+       field ColumnFamily
 }
 
 func (b *block) reset() {
@@ -56,16 +58,16 @@ func (b *block) assertValid() {
        itemsCount := len(timestamps)
        tff := b.tagFamilies
        for _, tf := range tff {
-               for _, c := range tf.columns {
-                       if len(c.values) != itemsCount {
-                               logger.Panicf("unexpected number of values for 
tags %q: got %d; want %d", c.name, len(c.values), itemsCount)
+               for _, c := range tf.Columns {
+                       if len(c.Values) != itemsCount {
+                               logger.Panicf("unexpected number of values for 
tags %q: got %d; want %d", c.Name, len(c.Values), itemsCount)
                        }
                }
        }
        ff := b.field
-       for _, f := range ff.columns {
-               if len(f.values) != itemsCount {
-                       logger.Panicf("unexpected number of values for fields 
%q: got %d; want %d", f.name, len(f.values), itemsCount)
+       for _, f := range ff.Columns {
+               if len(f.Values) != itemsCount {
+                       logger.Panicf("unexpected number of values for fields 
%q: got %d; want %d", f.Name, len(f.Values), itemsCount)
                }
        }
 }
@@ -107,23 +109,23 @@ func (b *block) processTagFamilies(tff []nameValues, i 
int, dataPointsLen int) {
        }
 }
 
-func (b *block) processTags(tf nameValues, cf columnFamily, i int, 
dataPointsLen int) {
+func (b *block) processTags(tf nameValues, cf ColumnFamily, i int, 
dataPointsLen int) {
        cf.resizeColumns(len(tf.values))
        for k, t := range tf.values {
-               b.processTag(t, cf.columns[k], i, dataPointsLen)
+               b.processTag(t, cf.Columns[k], i, dataPointsLen)
        }
 }
 
-func (b *block) processTag(t *nameValue, c column, i int, dataPointsLen int) {
+func (b *block) processTag(t *nameValue, c Column, i int, dataPointsLen int) {
        c.resizeValues(dataPointsLen)
-       c.valueType = t.valueType
-       c.values[i] = t.marshal()
+       c.ValueType = t.valueType
+       c.Values[i] = t.marshal()
 }
 
-func (b *block) resizeTagFamilies(tagFamiliesLen int) []columnFamily {
+func (b *block) resizeTagFamilies(tagFamiliesLen int) []ColumnFamily {
        tff := b.tagFamilies[:0]
        if n := tagFamiliesLen - cap(tff); n > 0 {
-               tff = append(tff[:cap(tff)], make([]columnFamily, n)...)
+               tff = append(tff[:cap(tff)], make([]ColumnFamily, n)...)
        }
        tff = tff[:tagFamiliesLen]
        b.tagFamilies = tff
@@ -153,32 +155,51 @@ func (b *block) mustWriteTo(sid common.SeriesID, bh 
*blockMetadata, sw *writers)
 
        // Marshal field
        f := b.field
-       cc := f.columns
+       cc := f.Columns
        chh := bh.field.resizeColumnMetadata(len(cc))
        for i := range cc {
                cc[i].mustWriteTo(&chh[i], &sw.fieldValuesWriter)
        }
 }
 
-func (b *block) marshalTagFamily(tf columnFamily, bh *blockMetadata, sw 
*writers) {
-       hw, w := sw.getColumnMetadataWriterAndColumnWriter(tf.name)
-       cc := tf.columns
+func (b *block) marshalTagFamily(tf ColumnFamily, bh *blockMetadata, sw 
*writers) {
+       hw, w := sw.getColumnMetadataWriterAndColumnWriter(tf.Name)
+       cc := tf.Columns
        cfh := getColumnFamilyMetadata()
        chh := cfh.resizeColumnMetadata(len(cc))
        for i := range cc {
                cc[i].mustWriteTo(&chh[i], w)
        }
        bb := longTermBufPool.Get()
+       defer longTermBufPool.Put(bb)
        bb.Buf = cfh.marshal(bb.Buf)
        putColumnFamilyMetadata(cfh)
-       tfh := bh.getTagFamilyMetadata(tf.name)
+       tfh := bh.getTagFamilyMetadata(tf.Name)
        tfh.offset = w.bytesWritten
        tfh.size = uint64(len(bb.Buf))
        if tfh.size > maxTagFamiliesMetadataSize {
                logger.Panicf("too big columnFamilyMetadataSize: %d bytes; 
mustn't exceed %d bytes", tfh.size, maxTagFamiliesMetadataSize)
        }
        hw.MustWrite(bb.Buf)
+}
+
+func (b *block) unmarshalTagFamily(decoder *encoding.BytesBlockDecoder, 
tfIndex int, name string, columnFamilyMetadataBlock *dataBlock, metaReader, 
valueReader fs.Reader) {
+       bb := longTermBufPool.Get()
+       bytes.ResizeExact(bb.Buf, int(columnFamilyMetadataBlock.size))
+       fs.MustReadData(metaReader, int64(columnFamilyMetadataBlock.offset), 
bb.Buf)
+       cfm := getColumnFamilyMetadata()
+       defer putColumnFamilyMetadata(cfm)
+       _, err := cfm.unmarshal(bb.Buf)
+       if err != nil {
+               logger.Panicf("%s: cannot unmarshal columnFamilyMetadata: %v", 
metaReader.Path(), err)
+       }
        longTermBufPool.Put(bb)
+       tf := b.tagFamilies[tfIndex]
+       cc := tf.resizeColumns(len(cfm.columnMetadata))
+       for i, c := range cc {
+               c.mustReadValues(decoder, valueReader, cfm.columnMetadata[i], 
uint64(b.Len()))
+       }
+
 }
 
 func (b *block) uncompressedSizeBytes() uint64 {
@@ -189,10 +210,10 @@ func (b *block) uncompressedSizeBytes() uint64 {
        tff := b.tagFamilies
        for i := range tff {
                tf := &tff[i]
-               nameLen := uint64(len(tf.name))
-               for _, c := range tf.columns {
-                       nameLen += uint64(len(c.name))
-                       for _, v := range c.values {
+               nameLen := uint64(len(tf.Name))
+               for _, c := range tf.Columns {
+                       nameLen += uint64(len(c.Name))
+                       for _, v := range c.Values {
                                if len(v) > 0 {
                                        n += nameLen + uint64(len(v))
                                }
@@ -201,10 +222,10 @@ func (b *block) uncompressedSizeBytes() uint64 {
        }
 
        ff := b.field
-       for i := range ff.columns {
-               c := &ff.columns[i]
-               nameLen := uint64(len(c.name))
-               for _, v := range c.values {
+       for i := range ff.Columns {
+               c := &ff.Columns[i]
+               nameLen := uint64(len(c.Name))
+               for _, v := range c.Values {
                        if len(v) > 0 {
                                n += nameLen + uint64(len(v))
                        }
@@ -213,19 +234,50 @@ func (b *block) uncompressedSizeBytes() uint64 {
        return n
 }
 
+func (b *block) mustReadFrom(decoder *encoding.BytesBlockDecoder, p *part, bm 
*blockMetadata, opts *QueryOptions) {
+       b.reset()
+
+       b.timestamps = mustReadTimestampsFrom(b.timestamps, &bm.timestamps, 
int(bm.count), p.timestamps)
+
+       _ = b.resizeTagFamilies(len(bm.tagFamilies))
+       var i int
+       for name, block := range bm.tagFamilies {
+               b.unmarshalTagFamily(decoder, i, name, block, 
p.tagFamilyMetadata[name], p.tagFamilies[name])
+               i++
+       }
+       cc := b.field.resizeColumns(len(bm.field.columnMetadata))
+       for i, c := range cc {
+               c.mustReadValues(decoder, p.fieldValues, 
bm.field.columnMetadata[i], bm.count)
+       }
+
+}
+
 func mustWriteTimestampsTo(th *timestampsMetadata, timestamps []int64, sw 
*writers) {
        th.reset()
 
        bb := longTermBufPool.Get()
+       defer longTermBufPool.Put(bb)
        bb.Buf, th.marshalType, th.min = encoding.Int64ListToBytes(bb.Buf[:0], 
timestamps)
        if len(bb.Buf) > maxTimestampsBlockSize {
-               logger.Panicf("BUG: too big block with timestamps: %d bytes; 
the maximum supported size is %d bytes", len(bb.Buf), maxTimestampsBlockSize)
+               logger.Panicf("too big block with timestamps: %d bytes; the 
maximum supported size is %d bytes", len(bb.Buf), maxTimestampsBlockSize)
        }
        th.max = timestamps[len(timestamps)-1]
        th.offset = sw.timestampsWriter.bytesWritten
        th.size = uint64(len(bb.Buf))
        sw.timestampsWriter.MustWrite(bb.Buf)
-       longTermBufPool.Put(bb)
+
+}
+
+func mustReadTimestampsFrom(dst []int64, tm *timestampsMetadata, count int, 
reader fs.Reader) []int64 {
+       bb := longTermBufPool.Get()
+       defer longTermBufPool.Put(bb)
+       fs.MustReadData(reader, int64(tm.offset), bb.Buf)
+       var err error
+       dst, err = encoding.BytesToInt64List(dst, bb.Buf, tm.marshalType, 
tm.min, count)
+       if err != nil {
+               logger.Panicf("%s: cannot unmarshal timestamps: %v", 
reader.Path(), err)
+       }
+       return dst
 }
 
 func getBlock() *block {
@@ -242,3 +294,97 @@ func putBlock(b *block) {
 }
 
 var blockPool sync.Pool
+
+type blockCursor struct {
+       idx int
+
+       timestamps []int64
+
+       tagFamilies []ColumnFamily
+
+       fields ColumnFamily
+
+       columnValuesDecoder encoding.BytesBlockDecoder
+       p                   *part
+       bm                  *blockMetadata
+       opts                *QueryOptions
+}
+
+func (bc *blockCursor) reset() {
+       bc.idx = 0
+       bc.p = nil
+       bc.bm = nil
+       bc.opts = nil
+
+       bc.timestamps = bc.timestamps[:0]
+
+       tff := bc.tagFamilies
+       for i := range tff {
+               tff[i].reset()
+       }
+       bc.tagFamilies = tff[:0]
+       bc.fields.reset()
+}
+
+func (bc *blockCursor) init(p *part, bm *blockMetadata, opts *QueryOptions) {
+       bc.reset()
+       bc.p = p
+       bc.bm = bm
+       bc.opts = opts
+}
+
+func (bc *blockCursor) loadData(tmpBlock *block) bool {
+       bc.reset()
+       tmpBlock.reset()
+       tmpBlock.mustReadFrom(&bc.columnValuesDecoder, bc.p, bc.bm, bc.opts)
+
+       start, end, ok := findRange(tmpBlock.timestamps, bc.opts.minTimestamp, 
bc.opts.maxTimestamp)
+       if !ok {
+               return false
+       }
+       bc.timestamps = append(bc.timestamps, tmpBlock.timestamps[start:end]...)
+       bc.fields.Name = tmpBlock.field.Name
+       bc.fields.Columns = append(bc.fields.Columns, 
tmpBlock.field.Columns[start:end]...)
+       for _, cf := range tmpBlock.tagFamilies {
+               tf := ColumnFamily{
+                       Name: cf.Name,
+               }
+               tf.Columns = append(tf.Columns, cf.Columns[start:end]...)
+               bc.tagFamilies = append(bc.tagFamilies, tf)
+       }
+       return true
+}
+
+func findRange(timestamps []int64, min int64, max int64) (int, int, bool) {
+       start, end := -1, -1
+
+       for i, t := range timestamps {
+               if t >= min && start == -1 {
+                       start = i
+               }
+               if t <= max {
+                       end = i
+               }
+       }
+
+       if start == -1 || end == -1 {
+               return 0, 0, false
+       }
+
+       return start, end, true
+}
+
+var blockCursorPool sync.Pool
+
+func generateBlockCursor() *blockCursor {
+       v := blockCursorPool.Get()
+       if v == nil {
+               return &blockCursor{}
+       }
+       return v.(*blockCursor)
+}
+
+func releaseBlockCursor(bc *blockCursor) {
+       bc.reset()
+       blockCursorPool.Put(bc)
+}
diff --git a/banyand/measure/block_metadata.go 
b/banyand/measure/block_metadata.go
index b3c26cd9..a9f82721 100644
--- a/banyand/measure/block_metadata.go
+++ b/banyand/measure/block_metadata.go
@@ -18,9 +18,12 @@
 package measure
 
 import (
+       "errors"
+       "fmt"
        "sync"
 
        "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/encoding"
 )
 
@@ -40,11 +43,26 @@ func (h *dataBlock) copyFrom(src *dataBlock) {
 }
 
 func (h *dataBlock) marshal(dst []byte) []byte {
-       dst = encoding.Uint64ToBytes(dst, h.offset)
-       dst = encoding.Uint64ToBytes(dst, h.size)
+       dst = encoding.VarUint64ToBytes(dst, h.offset)
+       dst = encoding.VarUint64ToBytes(dst, h.size)
        return dst
 }
 
+func (h *dataBlock) unmarshal(src []byte) ([]byte, error) {
+       src, n, err := encoding.BytesToVarUint64(src)
+       if err != nil {
+               return nil, fmt.Errorf("cannot unmarshal offset: %w", err)
+       }
+       h.offset = n
+
+       src, n, err = encoding.BytesToVarUint64(src)
+       if err != nil {
+               return nil, fmt.Errorf("cannot unmarshal size: %w", err)
+       }
+       h.size = n
+       return src, nil
+}
+
 type blockMetadata struct {
        seriesID common.SeriesID
 
@@ -99,7 +117,8 @@ func (bh *blockMetadata) marshal(dst []byte) []byte {
        dst = encoding.VarUint64ToBytes(dst, bh.count)
        dst = bh.timestamps.marshal(dst)
        dst = encoding.VarUint64ToBytes(dst, uint64(len(bh.tagFamilies)))
-       for _, cf := range bh.tagFamilies {
+       for name, cf := range bh.tagFamilies {
+               dst = encoding.EncodeBytes(dst, convert.StringToBytes(name))
                dst = cf.marshal(dst)
        }
        if len(bh.field.columnMetadata) > 0 {
@@ -108,6 +127,61 @@ func (bh *blockMetadata) marshal(dst []byte) []byte {
        return dst
 }
 
+func (bh *blockMetadata) unmarshal(src []byte) ([]byte, error) {
+       if len(src) < 8 {
+               return nil, errors.New("cannot unmarshal blockMetadata from 
less than 8 bytes")
+       }
+       bh.seriesID = common.SeriesID(encoding.BytesToUint64(src))
+       src = src[8:]
+       src, n, err := encoding.BytesToVarUint64(src)
+       if err != nil {
+               return nil, fmt.Errorf("cannot unmarshal uncompressedSizeBytes: 
%w", err)
+       }
+       bh.uncompressedSizeBytes = n
+
+       src, n, err = encoding.BytesToVarUint64(src)
+       if err != nil {
+               return nil, fmt.Errorf("cannot unmarshal count: %w", err)
+       }
+       bh.count = n
+       src, err = bh.timestamps.unmarshal(src)
+       if err != nil {
+               return nil, fmt.Errorf("cannot unmarshal timestampsMetadata: 
%w", err)
+       }
+       src, n, err = encoding.BytesToVarUint64(src)
+       if err != nil {
+               return nil, fmt.Errorf("cannot unmarshal tagFamilies count: 
%w", err)
+       }
+       var nameBytes []byte
+       for i := uint64(0); i < n; i++ {
+               src, nameBytes, err = encoding.DecodeBytes(src)
+               if err != nil {
+                       return nil, fmt.Errorf("cannot unmarshal tagFamily 
name: %w", err)
+               }
+               // TODO: cache dataBlock
+               tf := &dataBlock{}
+               src, err = tf.unmarshal(src)
+               if err != nil {
+                       return nil, fmt.Errorf("cannot unmarshal tagFamily 
dataBlock: %w", err)
+               }
+               bh.tagFamilies[convert.BytesToString(nameBytes)] = tf
+       }
+       if len(src) > 0 {
+               src, err = bh.field.unmarshal(src)
+               if err != nil {
+                       return nil, fmt.Errorf("cannot unmarshal 
columnFamilyMetadata: %w", err)
+               }
+       }
+       return src, nil
+}
+
+func (bh *blockMetadata) less(other *blockMetadata) bool {
+       if bh.seriesID == other.seriesID {
+               return bh.timestamps.min < other.timestamps.min
+       }
+       return bh.seriesID < other.seriesID
+}
+
 func getBlockMetadata() *blockMetadata {
        v := blockMetadataPool.Get()
        if v == nil {
@@ -151,3 +225,57 @@ func (th *timestampsMetadata) marshal(dst []byte) []byte {
        dst = append(dst, byte(th.marshalType))
        return dst
 }
+
+func (th *timestampsMetadata) unmarshal(src []byte) ([]byte, error) {
+       if len(src) < 25 {
+               return nil, errors.New("cannot unmarshal timestampsMetadata 
from less than 25 bytes")
+       }
+       th.offset = encoding.BytesToUint64(src)
+       th.size = encoding.BytesToUint64(src[8:])
+       th.min = int64(encoding.BytesToUint64(src))
+       src = src[8:]
+       th.max = int64(encoding.BytesToUint64(src))
+       src = src[8:]
+       th.marshalType = encoding.EncodeType(src[0])
+       return src[1:], nil
+}
+
+func unmarshalBlockMetadata(dst []blockMetadata, src []byte) ([]blockMetadata, 
error) {
+       dstOrig := dst
+       for len(src) > 0 {
+               if len(dst) < cap(dst) {
+                       dst = dst[:len(dst)+1]
+               } else {
+                       dst = append(dst, blockMetadata{})
+               }
+               bm := &dst[len(dst)-1]
+               tail, err := bm.unmarshal(src)
+               if err != nil {
+                       return dstOrig, fmt.Errorf("cannot unmarshal 
blockMetadata entries: %w", err)
+               }
+               src = tail
+       }
+       if err := validateBlockHeaders(dst[len(dstOrig):]); err != nil {
+               return dstOrig, err
+       }
+       return dst, nil
+}
+
+func validateBlockHeaders(bhs []blockMetadata) error {
+       for i := 1; i < len(bhs); i++ {
+               bhCurr := &bhs[i]
+               bhPrev := &bhs[i-1]
+               if bhCurr.seriesID < bhPrev.seriesID {
+                       return fmt.Errorf("unexpected blockMetadata with 
smaller seriesID=%d after bigger seriesID=%d at position %d", &bhCurr.seriesID, 
&bhPrev.seriesID, i)
+               }
+               if bhCurr.seriesID != bhPrev.seriesID {
+                       continue
+               }
+               thCurr := bhCurr.timestamps
+               thPrev := bhPrev.timestamps
+               if thCurr.min < thPrev.min {
+                       return fmt.Errorf("unexpected blockMetadata with 
smaller timestamp=%d after bigger timestamp=%d at position %d", thCurr.min, 
thPrev.min, i)
+               }
+       }
+       return nil
+}
diff --git a/banyand/measure/field_flag_test.go 
b/banyand/measure/block_reader.go
similarity index 51%
rename from banyand/measure/field_flag_test.go
rename to banyand/measure/block_reader.go
index c47e457f..01358087 100644
--- a/banyand/measure/field_flag_test.go
+++ b/banyand/measure/block_reader.go
@@ -17,24 +17,28 @@
 
 package measure
 
-import (
-       "testing"
-       "time"
+import "github.com/apache/skywalking-banyandb/pkg/fs"
 
-       "github.com/stretchr/testify/assert"
+type reader struct {
+       r         fs.Reader
+       bytesRead uint64
+}
+
+func newReader(r fs.Reader) *reader {
+       return &reader{r: r}
+}
 
-       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
-       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
-)
+func (r *reader) reset() {
+       r.r = nil
+       r.bytesRead = 0
+}
+
+func (r *reader) Path() string {
+       return r.r.Path()
+}
 
-func TestEncodeFieldFlag(t *testing.T) {
-       flag := pbv1.EncoderFieldFlag(&databasev1.FieldSpec{
-               EncodingMethod:    
databasev1.EncodingMethod_ENCODING_METHOD_GORILLA,
-               CompressionMethod: 
databasev1.CompressionMethod_COMPRESSION_METHOD_ZSTD,
-       }, time.Minute)
-       fieldSpec, interval, err := pbv1.DecodeFieldFlag(flag)
-       assert.NoError(t, err)
-       assert.Equal(t, databasev1.EncodingMethod_ENCODING_METHOD_GORILLA, 
fieldSpec.EncodingMethod)
-       assert.Equal(t, databasev1.CompressionMethod_COMPRESSION_METHOD_ZSTD, 
fieldSpec.CompressionMethod)
-       assert.Equal(t, time.Minute, interval)
+func (r *reader) Read(p []byte) (int, error) {
+       n, err := r.r.Read(0, p)
+       r.bytesRead += uint64(n)
+       return n, err
 }
diff --git a/banyand/measure/block_writer.go b/banyand/measure/block_writer.go
index ee4f1fe7..994e18bd 100644
--- a/banyand/measure/block_writer.go
+++ b/banyand/measure/block_writer.go
@@ -235,12 +235,12 @@ func (bsw *blockWriter) MustWriteDataPoints(sid 
common.SeriesID, timestamps []in
        bsw.primaryBlockData = bh.marshal(bsw.primaryBlockData)
        putBlockMetadata(bh)
        if len(bsw.primaryBlockData) > maxUncompressedPrimaryBlockSize {
-               bsw.mustFlushIndexBlock(bsw.primaryBlockData)
+               bsw.mustFlushPrimaryBlock(bsw.primaryBlockData)
                bsw.primaryBlockData = bsw.primaryBlockData[:0]
        }
 }
 
-func (bsw *blockWriter) mustFlushIndexBlock(data []byte) {
+func (bsw *blockWriter) mustFlushPrimaryBlock(data []byte) {
        if len(data) > 0 {
                bsw.primaryBlockMetadata.mustWriteBlock(data, bsw.sidFirst, 
bsw.minTimestamp, bsw.maxTimestamp, &bsw.streamWriters)
                bsw.metaData = bsw.primaryBlockMetadata.marshal(bsw.metaData)
@@ -258,7 +258,7 @@ func (bsw *blockWriter) Flush(ph *partMetadata) {
        ph.MinTimestamp = bsw.totalMinTimestamp
        ph.MaxTimestamp = bsw.totalMaxTimestamp
 
-       bsw.mustFlushIndexBlock(bsw.primaryBlockData)
+       bsw.mustFlushPrimaryBlock(bsw.primaryBlockData)
 
        bb := longTermBufPool.Get()
        bb.Buf = zstd.Compress(bb.Buf[:0], bsw.metaData, 1)
@@ -271,17 +271,17 @@ func (bsw *blockWriter) Flush(ph *partMetadata) {
        bsw.reset()
 }
 
-func getBlockWriter() *blockWriter {
-       v := blockStreamWriterPool.Get()
+func generateBlockWriter() *blockWriter {
+       v := blockWriterPool.Get()
        if v == nil {
                return &blockWriter{}
        }
        return v.(*blockWriter)
 }
 
-func putBlockStreamWriter(bsw *blockWriter) {
+func releaseBlockWriter(bsw *blockWriter) {
        bsw.reset()
-       blockStreamWriterPool.Put(bsw)
+       blockWriterPool.Put(bsw)
 }
 
-var blockStreamWriterPool sync.Pool
+var blockWriterPool sync.Pool
diff --git a/banyand/measure/column.go b/banyand/measure/column.go
index 33e6ea86..b5502c6f 100644
--- a/banyand/measure/column.go
+++ b/banyand/measure/column.go
@@ -18,47 +18,48 @@
 package measure
 
 import (
-       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/pkg/bytes"
        "github.com/apache/skywalking-banyandb/pkg/encoding"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
 )
 
-type column struct {
-       name      string
-       valueType storage.ValueType
-       values    [][]byte
+type Column struct {
+       Name      string
+       ValueType pbv1.ValueType
+       Values    [][]byte
 }
 
-func (c *column) reset() {
-       c.name = ""
+func (c *Column) reset() {
+       c.Name = ""
 
-       values := c.values
+       values := c.Values
        for i := range values {
                values[i] = nil
        }
-       c.values = values[:0]
+       c.Values = values[:0]
 }
 
-func (c *column) resizeValues(valuesLen int) [][]byte {
-       values := c.values
+func (c *Column) resizeValues(valuesLen int) [][]byte {
+       values := c.Values
        if n := valuesLen - cap(values); n > 0 {
                values = append(values[:cap(values)], make([][]byte, n)...)
        }
        values = values[:valuesLen]
-       c.values = values
+       c.Values = values
        return values
 }
 
-func (c *column) mustWriteTo(ch *columnMetadata, columnWriter *writer) {
+func (c *Column) mustWriteTo(ch *columnMetadata, columnWriter *writer) {
        ch.reset()
 
-       ch.name = c.name
-       ch.valueType = c.valueType
+       ch.name = c.Name
+       ch.valueType = c.ValueType
 
        // remove value type from values
-       for i := range c.values {
-               c.values[i] = c.values[i][1:]
+       for i := range c.Values {
+               c.Values[i] = c.Values[i][1:]
        }
 
        // TODO: encoding values based on value type
@@ -67,7 +68,7 @@ func (c *column) mustWriteTo(ch *columnMetadata, columnWriter 
*writer) {
        defer longTermBufPool.Put(bb)
 
        // marshal values
-       bb.Buf = encoding.EncodeBytesBlock(bb.Buf[:0], c.values)
+       bb.Buf = encoding.EncodeBytesBlock(bb.Buf[:0], c.Values)
        ch.size = uint64(len(bb.Buf))
        if ch.size > maxValuesBlockSize {
                logger.Panicf("too valuesSize: %d bytes; mustn't exceed %d 
bytes", ch.size, maxValuesBlockSize)
@@ -76,29 +77,48 @@ func (c *column) mustWriteTo(ch *columnMetadata, 
columnWriter *writer) {
        columnWriter.MustWrite(bb.Buf)
 }
 
+func (c *Column) mustReadValues(decoder *encoding.BytesBlockDecoder, reader 
fs.Reader, cm columnMetadata, count uint64) {
+       c.Name = cm.name
+       c.ValueType = cm.valueType
+
+       bb := longTermBufPool.Get()
+       defer longTermBufPool.Put(bb)
+       valuesSize := cm.size
+       if valuesSize > maxValuesBlockSize {
+               logger.Panicf("%s: block size cannot exceed %d bytes; got %d 
bytes", reader.Path(), maxValuesBlockSize, valuesSize)
+       }
+       bb.Buf = bytes.ResizeOver(bb.Buf, int(valuesSize))
+       fs.MustReadData(reader, int64(cm.offset), bb.Buf)
+       var err error
+       c.Values, err = decoder.Decode(c.Values[:0], bb.Buf, count)
+       if err != nil {
+               logger.Panicf("%s: cannot decode values: %v", reader.Path(), 
err)
+       }
+}
+
 var longTermBufPool bytes.BufferPool
 
-type columnFamily struct {
-       name    string
-       columns []column
+type ColumnFamily struct {
+       Name    string
+       Columns []Column
 }
 
-func (cf *columnFamily) reset() {
-       cf.name = ""
+func (cf *ColumnFamily) reset() {
+       cf.Name = ""
 
-       columns := cf.columns
+       columns := cf.Columns
        for i := range columns {
                columns[i].reset()
        }
-       cf.columns = columns[:0]
+       cf.Columns = columns[:0]
 }
 
-func (cf *columnFamily) resizeColumns(columnsLen int) []column {
-       columns := cf.columns
+func (cf *ColumnFamily) resizeColumns(columnsLen int) []Column {
+       columns := cf.Columns
        if n := columnsLen - cap(columns); n > 0 {
-               columns = append(columns[:cap(columns)], make([]column, n)...)
+               columns = append(columns[:cap(columns)], make([]Column, n)...)
        }
        columns = columns[:columnsLen]
-       cf.columns = columns
+       cf.Columns = columns
        return columns
 }
diff --git a/banyand/measure/column_metadata.go 
b/banyand/measure/column_metadata.go
index 5896580d..a14f7c80 100644
--- a/banyand/measure/column_metadata.go
+++ b/banyand/measure/column_metadata.go
@@ -18,17 +18,18 @@
 package measure
 
 import (
+       "fmt"
        "sync"
 
-       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/encoding"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
 )
 
 type columnMetadata struct {
        dataBlock
        name      string
-       valueType storage.ValueType
+       valueType pbv1.ValueType
 }
 
 func (cm *columnMetadata) reset() {
@@ -51,6 +52,24 @@ func (cm *columnMetadata) marshal(dst []byte) []byte {
        return dst
 }
 
+func (cm *columnMetadata) unmarshal(src []byte) ([]byte, error) {
+       src, nameBytes, err := encoding.DecodeBytes(src)
+       if err != nil {
+               return nil, fmt.Errorf("cannot unmarshal columnMetadata.name: 
%w", err)
+       }
+       cm.name = convert.BytesToString(nameBytes)
+       if len(src) < 1 {
+               return nil, fmt.Errorf("cannot unmarshal 
columnMetadata.valueType: src is too short")
+       }
+       cm.valueType = pbv1.ValueType(src[0])
+       src = src[1:]
+       src, err = cm.dataBlock.unmarshal(src)
+       if err != nil {
+               return nil, fmt.Errorf("cannot unmarshal 
columnMetadata.dataBlock: %w", err)
+       }
+       return src, nil
+}
+
 type columnFamilyMetadata struct {
        columnMetadata []columnMetadata
 }
@@ -94,13 +113,28 @@ func (cfm *columnFamilyMetadata) 
resizeColumnMetadata(columnMetadataLen int) []c
 
 func (cfm *columnFamilyMetadata) marshal(dst []byte) []byte {
        cms := cfm.columnMetadata
-       dst = encoding.Uint64ToBytes(dst, uint64(len(cms)))
+       dst = encoding.VarUint64ToBytes(dst, uint64(len(cms)))
        for i := range cms {
                dst = cms[i].marshal(dst)
        }
        return dst
 }
 
+func (cfm *columnFamilyMetadata) unmarshal(src []byte) ([]byte, error) {
+       src, columnMetadataLen, err := encoding.BytesToVarInt64(src)
+       if err != nil {
+               return nil, fmt.Errorf("cannot unmarshal columnMetadataLen: 
%w", err)
+       }
+       cms := cfm.resizeColumnMetadata(int(columnMetadataLen))
+       for i := range cms {
+               src, err = cms[i].unmarshal(src)
+               if err != nil {
+                       return nil, fmt.Errorf("cannot unmarshal columnMetadata 
%d: %w", i, err)
+               }
+       }
+       return src, nil
+}
+
 func getColumnFamilyMetadata() *columnFamilyMetadata {
        v := columnFamilyMetadataPool.Get()
        if v == nil {
diff --git a/banyand/measure/datapoints.go b/banyand/measure/datapoints.go
index 44912cd6..d02f1065 100644
--- a/banyand/measure/datapoints.go
+++ b/banyand/measure/datapoints.go
@@ -23,13 +23,14 @@ import (
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/pkg/index"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
        "github.com/pkg/errors"
 )
 
 type nameValue struct {
        name      string
-       valueType storage.ValueType
+       valueType pbv1.ValueType
        value     []byte
        valueArr  [][]byte
 }
@@ -51,7 +52,7 @@ func (n *nameValue) marshal() []byte {
        if n.valueArr != nil {
                var dst []byte
                for i := range n.valueArr {
-                       if n.valueType == storage.ValueTypeInt64Arr {
+                       if n.valueType == pbv1.ValueTypeInt64Arr {
                                dst = append(dst, n.valueArr[i]...)
                                continue
                        }
diff --git a/banyand/measure/measure_query.go b/banyand/measure/measure_query.go
index 5a2dce55..718a1f56 100644
--- a/banyand/measure/measure_query.go
+++ b/banyand/measure/measure_query.go
@@ -18,7 +18,12 @@
 package measure
 
 import (
+       "container/heap"
+       "context"
+       "fmt"
        "io"
+       "sort"
+       "sync"
        "time"
 
        "github.com/pkg/errors"
@@ -29,7 +34,10 @@ import (
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/banyand/tsdb"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
 )
@@ -45,6 +53,7 @@ type Query interface {
 // Measure allows inspecting measure data points' details.
 type Measure interface {
        io.Closer
+       Query(ctx context.Context, opts pbv1.MeasureQueryOptions) 
(pbv1.MeasureQueryResult, error)
        Shards(entity tsdb.Entity) ([]tsdb.Shard, error)
        CompanionShards(metadata *commonv1.Metadata) ([]tsdb.Shard, error)
        Shard(id common.ShardID) (tsdb.Shard, error)
@@ -53,6 +62,7 @@ type Measure interface {
        GetSchema() *databasev1.Measure
        GetIndexRules() []*databasev1.IndexRule
        GetInterval() time.Duration
+       GetShardNum() uint32
        SetSchema(schema *databasev1.Measure)
 }
 
@@ -66,6 +76,386 @@ func (s *measure) GetInterval() time.Duration {
        return s.interval
 }
 
+func (s *measure) GetShardNum() uint32 {
+       return s.shardNum
+}
+
+type QueryResult struct {
+       data           []*blockCursor
+       originalData   []*blockCursor
+       lastIndex      int
+       pws            []*partWrapper
+       shouldMergeAll bool
+       asc            bool
+}
+
+const round = time.Millisecond
+
+func (qr *QueryResult) Pull() *pbv1.Result {
+       if len(qr.data) == 0 {
+               return nil
+       }
+       // TODO:// Parallel load
+       tmpBlock := getBlock()
+       defer putBlock(tmpBlock)
+       for i := 0; i < len(qr.data); i++ {
+               if !qr.data[i].loadData(tmpBlock) {
+                       qr.data = append(qr.data[:i], qr.data[i+1:]...)
+                       i--
+               }
+       }
+       if qr.shouldMergeAll {
+               if len(qr.data) == 1 {
+                       r := &pbv1.Result{}
+                       bc := qr.data[0]
+                       bc.copyAllTo(r)
+                       return r
+               }
+               return qr.merge(round)
+       }
+       originalData := qr.data
+       var lastSeriesID common.SeriesID
+       for i := qr.lastIndex; i < len(originalData); i++ {
+               bc := originalData[i]
+               if lastSeriesID != 0 && lastSeriesID != bc.bm.seriesID {
+                       qr.data = qr.data[:0]
+                       qr.data = append(qr.data, 
originalData[qr.lastIndex:i]...)
+                       qr.lastIndex = i
+                       return qr.merge(round)
+               }
+               lastSeriesID = bc.bm.seriesID
+       }
+       if qr.lastIndex < len(originalData)-1 {
+               qr.data = originalData[qr.lastIndex:]
+               return qr.merge(round)
+       }
+       return nil
+}
+
+func (qr *QueryResult) Release() {
+       for i, v := range qr.data {
+               releaseBlockCursor(v)
+               qr.data[i] = nil
+       }
+       qr.data = qr.data[:0]
+       for i := range qr.pws {
+               qr.pws[i].decRef()
+       }
+       qr.pws = qr.pws[:0]
+}
+
+func (qr QueryResult) Len() int {
+       return len(qr.data)
+}
+
+func (qr QueryResult) Less(i, j int) bool {
+       if qr.asc {
+               return qr.data[i].timestamps[qr.data[i].idx] < 
qr.data[j].timestamps[qr.data[j].idx]
+       }
+       return qr.data[i].timestamps[qr.data[i].idx] > 
qr.data[j].timestamps[qr.data[j].idx]
+}
+
+func (qr QueryResult) Swap(i, j int) {
+       qr.data[i], qr.data[j] = qr.data[j], qr.data[i]
+}
+
+func (qr *QueryResult) Push(x interface{}) {
+       qr.data = append(qr.data, x.(*blockCursor))
+}
+
+func (qr *QueryResult) Pop() interface{} {
+       old := qr.data
+       n := len(old)
+       x := old[n-1]
+       qr.data = old[0 : n-1]
+       return x
+}
+
+func (qr *QueryResult) merge(round time.Duration) *pbv1.Result {
+       result := &pbv1.Result{}
+       var lastOriginalTimestamp int64
+       r := int64(round)
+
+       for qr.Len() > 0 {
+               bc := heap.Pop(qr).(*blockCursor)
+
+               roundedTimestamp := (bc.timestamps[bc.idx] / r) * r
+
+               if len(result.Timestamps) > 0 && roundedTimestamp == 
result.Timestamps[len(result.Timestamps)-1] {
+                       if bc.timestamps[bc.idx] > lastOriginalTimestamp {
+                               lastOriginalTimestamp = bc.timestamps[bc.idx]
+                       }
+               } else {
+                       bc.copyTo(result)
+                       lastOriginalTimestamp = bc.timestamps[bc.idx]
+               }
+
+               bc.idx++
+
+               if bc.idx < len(bc.timestamps) {
+                       heap.Push(qr, bc)
+               }
+       }
+
+       return result
+}
+
+type QueryOptions struct {
+       pbv1.MeasureQueryOptions
+       minTimestamp int64
+       maxTimestamp int64
+}
+
+func (s *measure) Query(ctx context.Context, mqo pbv1.MeasureQueryOptions) 
(pbv1.MeasureQueryResult, error) {
+       if mqo.TimeRange == nil || mqo.Entity == nil {
+               return nil, errors.New("invalid query options: timeRange and 
series are required")
+       }
+       tsdb := s.databaseSupplier.SupplyTSDB().(storage.TSDB[*tsTable])
+       tabWrappers := tsdb.SelectTSTables(*mqo.TimeRange)
+       sl, err := tsdb.IndexDB().Search(ctx, &pbv1.Series{Subject: mqo.Name, 
EntityValues: mqo.Entity}, mqo.Filter, mqo.Order)
+       if err != nil {
+               return nil, err
+       }
+       var sids []common.SeriesID
+       for i := range sl {
+               sids = append(sids, sl[i].ID)
+       }
+       var pws []*partWrapper
+       var parts []*part
+       qo := &QueryOptions{
+               MeasureQueryOptions: mqo,
+               minTimestamp:        mqo.TimeRange.Start.UnixNano(),
+               maxTimestamp:        mqo.TimeRange.End.UnixNano(),
+       }
+       for _, tw := range tabWrappers {
+               pws, parts = tw.Table().getParts(pws, parts, qo)
+       }
+       // TODO: cache tstIter
+       var tstIter tstIter
+       originalSids := make([]common.SeriesID, len(sids))
+       copy(originalSids, sids)
+       sort.Slice(sids, func(i, j int) bool { return sids[i] < sids[j] })
+       tstIter.init(parts, sids, qo)
+       if tstIter.err != nil {
+               return nil, fmt.Errorf("cannot init tstIter: %w", tstIter.err)
+       }
+       var result QueryResult
+       for tstIter.NextBlock() {
+               bc := generateBlockCursor()
+               p := tstIter.piHeap[0]
+               bc.init(p.p, p.bm, qo)
+               result.data = append(result.data, bc)
+       }
+       if mqo.Order != nil {
+               if mqo.Order.Sort == modelv1.Sort_SORT_ASC || mqo.Order.Sort == 
modelv1.Sort_SORT_UNSPECIFIED {
+                       result.asc = true
+               }
+               sidToIndex := make(map[common.SeriesID]int)
+               for i, si := range originalSids {
+                       sidToIndex[si] = i
+               }
+               sort.Slice(result.data, func(i, j int) bool {
+                       return sidToIndex[result.data[i].bm.seriesID] < 
sidToIndex[result.data[j].bm.seriesID]
+               })
+       } else {
+               result.asc = true
+       }
+       return &result, nil
+}
+
+type tableIterator struct {
+       sids     []common.SeriesID
+       tsTables []*tsTable
+}
+
+func (si *tableIterator) reset() {
+
+}
+
+// func (si *tableIterator) init(seriesTablesList []*seriesTables) {
+
+// }
+
+func (si *tableIterator) nextBlock() bool {
+       return false
+}
+
+func generateTableIterator() *tableIterator {
+       v := tiPool.Get()
+       if v == nil {
+               return &tableIterator{}
+       }
+       return v.(*tableIterator)
+}
+
+func releaseTableIterator(ti *tableIterator) {
+       ti.reset()
+       tiPool.Put(ti)
+}
+
+var tiPool sync.Pool
+
+func (bc *blockCursor) copyAllTo(r *pbv1.Result) {
+       r.Timestamps = bc.timestamps
+       for _, cf := range bc.tagFamilies {
+               tf := pbv1.TagFamily{
+                       Name: cf.Name,
+               }
+               for _, c := range cf.Columns {
+                       t := pbv1.Tag{
+                               Name: c.Name,
+                       }
+                       for _, v := range c.Values {
+                               t.Values = append(t.Values, 
mustDecodeTagValue(c.ValueType, v))
+                       }
+                       tf.Tags = append(tf.Tags, t)
+               }
+               r.TagFamilies = append(r.TagFamilies, tf)
+       }
+       for _, c := range bc.fields.Columns {
+               f := pbv1.Field{
+                       Name: c.Name,
+               }
+               for _, v := range c.Values {
+                       f.Values = append(f.Values, 
mustDecodeFieldValue(c.ValueType, v))
+               }
+               r.Fields = append(r.Fields, f)
+       }
+}
+
+func (bc *blockCursor) copyTo(r *pbv1.Result) {
+       r.Timestamps = append(r.Timestamps, bc.timestamps[bc.idx])
+       if len(r.TagFamilies) != len(bc.tagFamilies) {
+               for _, cf := range bc.tagFamilies {
+                       tf := pbv1.TagFamily{
+                               Name: cf.Name,
+                       }
+                       for _, c := range cf.Columns {
+                               t := pbv1.Tag{
+                                       Name: c.Name,
+                               }
+                               tf.Tags = append(tf.Tags, t)
+                       }
+                       r.TagFamilies = append(r.TagFamilies, tf)
+               }
+       }
+       for i, cf := range bc.tagFamilies {
+               for i2, c := range cf.Columns {
+                       r.TagFamilies[i].Tags[i2].Values = 
append(r.TagFamilies[i].Tags[i2].Values, mustDecodeTagValue(c.ValueType, 
c.Values[bc.idx]))
+               }
+       }
+
+       if len(r.Fields) != len(bc.fields.Columns) {
+               for _, c := range bc.fields.Columns {
+                       f := pbv1.Field{
+                               Name: c.Name,
+                       }
+                       r.Fields = append(r.Fields, f)
+               }
+       }
+       for i, c := range bc.fields.Columns {
+               r.Fields[i].Values = append(r.Fields[i].Values, 
mustDecodeFieldValue(c.ValueType, c.Values[bc.idx]))
+       }
+}
+
+func mustDecodeTagValue(valueType pbv1.ValueType, value []byte) 
*modelv1.TagValue {
+       switch valueType {
+       case pbv1.ValueTypeInt64:
+               return &modelv1.TagValue{
+                       Value: &modelv1.TagValue_Int{
+                               Int: &modelv1.Int{
+                                       Value: convert.BytesToInt64(value),
+                               },
+                       },
+               }
+       case pbv1.ValueTypeStr:
+               return &modelv1.TagValue{
+                       Value: &modelv1.TagValue_Str{
+                               Str: &modelv1.Str{
+                                       Value: string(value),
+                               },
+                       },
+               }
+       case pbv1.ValueTypeBinaryData:
+               data := make([]byte, len(value))
+               copy(data, value)
+               return &modelv1.TagValue{
+                       Value: &modelv1.TagValue_BinaryData{
+                               BinaryData: data,
+                       },
+               }
+       case pbv1.ValueTypeInt64Arr:
+               var values []int64
+               for i := 0; i < len(value); i += 8 {
+                       values = append(values, 
convert.BytesToInt64(value[i:i+8]))
+               }
+               return &modelv1.TagValue{
+                       Value: &modelv1.TagValue_IntArray{
+                               IntArray: &modelv1.IntArray{
+                                       Value: values,
+                               }}}
+       case pbv1.ValueTypeStrArr:
+               var values []string
+               bb := longTermBufPool.Get()
+               var err error
+               for len(value) > 0 {
+                       bb.Buf, value, err = unmarshalVarArray(bb.Buf[:0], 
value)
+                       if err != nil {
+                               logger.Panicf("unmarshalVarArray failed: %v", 
err)
+                       }
+                       values = append(values, string(bb.Buf))
+               }
+               return &modelv1.TagValue{
+                       Value: &modelv1.TagValue_StrArray{
+                               StrArray: &modelv1.StrArray{
+                                       Value: values,
+                               }}}
+       default:
+               logger.Panicf("unsupported value type: %v", valueType)
+               return nil
+       }
+}
+
+func mustDecodeFieldValue(valueType pbv1.ValueType, value []byte) 
*modelv1.FieldValue {
+       switch valueType {
+       case pbv1.ValueTypeInt64:
+               return &modelv1.FieldValue{
+                       Value: &modelv1.FieldValue_Int{
+                               Int: &modelv1.Int{
+                                       Value: convert.BytesToInt64(value),
+                               },
+                       },
+               }
+       case pbv1.ValueTypeFloat64:
+               return &modelv1.FieldValue{
+                       Value: &modelv1.FieldValue_Float{
+                               Float: &modelv1.Float{
+                                       Value: convert.BytesToFloat64(value),
+                               },
+                       },
+               }
+       case pbv1.ValueTypeStr:
+               return &modelv1.FieldValue{
+                       Value: &modelv1.FieldValue_Str{
+                               Str: &modelv1.Str{
+                                       Value: string(value),
+                               },
+                       },
+               }
+       case pbv1.ValueTypeBinaryData:
+               data := make([]byte, len(value))
+               copy(data, value)
+               return &modelv1.FieldValue{
+                       Value: &modelv1.FieldValue_BinaryData{
+                               BinaryData: data,
+                       },
+               }
+       default:
+               logger.Panicf("unsupported value type: %v", valueType)
+               return nil
+       }
+}
+
 func (s *measure) Shards(entity tsdb.Entity) ([]tsdb.Shard, error) {
        panic("implement me")
        // wrap := func(shards []tsdb.Shard) []tsdb.Shard {
@@ -111,10 +501,6 @@ func (s *measure) CompanionShards(metadata 
*commonv1.Metadata) ([]tsdb.Shard, er
        // return wrap(db.Shards()), nil
 }
 
-func formatMeasureCompanionPrefix(measureName, name string) string {
-       return measureName + "." + name
-}
-
 func (s *measure) Shard(id common.ShardID) (tsdb.Shard, error) {
        panic("implement me")
        // shard, err := s.databaseSupplier.SupplyTSDB().Shard(id)
diff --git a/banyand/measure/measure_topn.go b/banyand/measure/measure_topn.go
index 15780b54..79263a1c 100644
--- a/banyand/measure/measure_topn.go
+++ b/banyand/measure/measure_topn.go
@@ -39,7 +39,6 @@ import (
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/banyand/queue"
-       "github.com/apache/skywalking-banyandb/banyand/tsdb"
        "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/flow"
        "github.com/apache/skywalking-banyandb/pkg/flow/streaming"
@@ -72,7 +71,7 @@ var (
 
 type dataPointWithEntityValues struct {
        *measurev1.DataPointValue
-       entityValues tsdb.EntityValues
+       entityValues []*modelv1.TagValue
 }
 
 type topNStreamingProcessor struct {
@@ -144,6 +143,8 @@ func (t *topNStreamingProcessor) writeStreamRecord(record 
flow.StreamRecord) err
        eventTime := t.downSampleTimeBucket(record.TimestampMillis())
        timeBucket := eventTime.Format(timeBucketFormat)
        var err error
+       publisher := t.pipeline.NewBatchPublisher()
+       defer publisher.Close()
        for group, tuples := range tuplesGroups {
                if e := t.l.Debug(); e.Enabled() {
                        e.Str("TopN", t.topNSchema.GetMetadata().GetName()).
@@ -154,13 +155,13 @@ func (t *topNStreamingProcessor) writeStreamRecord(record 
flow.StreamRecord) err
                for rankNum, tuple := range tuples {
                        fieldValue := tuple.V1.(int64)
                        data := tuple.V2.(flow.StreamRecord).Data().(flow.Data)
-                       err = multierr.Append(err, t.writeData(eventTime, 
timeBucket, fieldValue, group, data, rankNum))
+                       err = multierr.Append(err, t.writeData(publisher, 
eventTime, timeBucket, fieldValue, group, data, rankNum))
                }
        }
        return err
 }
 
-func (t *topNStreamingProcessor) writeData(eventTime time.Time, timeBucket 
string, fieldValue int64,
+func (t *topNStreamingProcessor) writeData(publisher queue.BatchPublisher, 
eventTime time.Time, timeBucket string, fieldValue int64,
        group string, data flow.Data, rankNum int,
 ) error {
        var tagValues []*modelv1.TagValue
@@ -170,7 +171,7 @@ func (t *topNStreamingProcessor) writeData(eventTime 
time.Time, timeBucket strin
                        return errors.New("fail to extract tag values from topN 
result")
                }
        }
-       entity, entityValues, shardID, err := t.locate(tagValues, rankNum)
+       series, shardID, err := t.locate(tagValues, rankNum)
        if err != nil {
                return err
        }
@@ -197,7 +198,7 @@ func (t *topNStreamingProcessor) writeData(eventTime 
time.Time, timeBucket strin
                                                                        },
                                                                },
                                                        },
-                                               }, 
data[0].(tsdb.EntityValues)...),
+                                               }, 
data[0].([]*modelv1.TagValue)...),
                                        },
                                },
                                Fields: []*modelv1.FieldValue{
@@ -211,14 +212,13 @@ func (t *topNStreamingProcessor) writeData(eventTime 
time.Time, timeBucket strin
                                },
                        },
                },
-               ShardId:    uint32(shardID),
-               SeriesHash: tsdb.HashEntity(entity),
+               ShardId: uint32(shardID),
        }
        if t.l.Debug().Enabled() {
-               iwr.EntityValues = entityValues.Encode()
+               iwr.EntityValues = series.EntityValues
        }
-       message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), iwr)
-       _, errWritePub := t.pipeline.Publish(apiData.TopicMeasureWrite, message)
+       message := bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), 
"local", iwr)
+       _, errWritePub := publisher.Publish(apiData.TopicMeasureWrite, message)
        return errWritePub
 }
 
@@ -226,34 +226,49 @@ func (t *topNStreamingProcessor) 
downSampleTimeBucket(eventTimeMillis int64) tim
        return time.UnixMilli(eventTimeMillis - 
eventTimeMillis%t.interval.Milliseconds())
 }
 
-func (t *topNStreamingProcessor) locate(tagValues []*modelv1.TagValue, rankNum 
int) (tsdb.Entity, tsdb.EntityValues, common.ShardID, error) {
+func (t *topNStreamingProcessor) locate(tagValues []*modelv1.TagValue, rankNum 
int) (*pbv1.Series, common.ShardID, error) {
        if len(tagValues) != 0 && len(t.topNSchema.GetGroupByTagNames()) != 
len(tagValues) {
-               return nil, nil, 0, errors.New("no enough tag values for the 
entity")
+               return nil, 0, errors.New("no enough tag values for the entity")
        }
+       // Subject: topN aggregation Name
+       //
        // entity prefix
-       // 1) source measure Name + topN aggregation Name
-       // 2) sort direction
-       // 3) rank number
-       // >4) group tag values if needed
-       entity := make(tsdb.EntityValues, 1+1+1+len(tagValues))
+       //
+       // 1) sort direction
+       // 2) rank number
+       // >3) group tag values if needed
+       series := &pbv1.Series{
+               Subject:      t.topNSchema.GetMetadata().GetName(),
+               EntityValues: make([]*modelv1.TagValue, 1+1+len(tagValues)),
+       }
+
        // entity prefix
-       entity[0] = 
tsdb.StrValue(formatMeasureCompanionPrefix(t.topNSchema.GetSourceMeasure().GetName(),
-               t.topNSchema.GetMetadata().GetName()))
-       entity[1] = tsdb.Int64Value(int64(t.sortDirection.Number()))
-       entity[2] = tsdb.Int64Value(int64(rankNum))
+       series.EntityValues[0] = &modelv1.TagValue{
+               Value: &modelv1.TagValue_Int{
+                       Int: &modelv1.Int{
+                               Value: int64(t.sortDirection),
+                       },
+               },
+       }
+       series.EntityValues[1] = &modelv1.TagValue{
+               Value: &modelv1.TagValue_Int{
+                       Int: &modelv1.Int{
+                               Value: int64(rankNum),
+                       },
+               },
+       }
        // measureID as sharding key
        for idx, tagVal := range tagValues {
-               entity[idx+3] = tagVal
+               series.EntityValues[idx+3] = tagVal
        }
-       e, err := entity.ToEntity()
-       if err != nil {
-               return nil, nil, 0, err
+       if err := series.Marshal(); err != nil {
+               return nil, 0, fmt.Errorf("fail to marshal series: %w", err)
        }
-       id, err := partition.ShardID(e.Marshal(), t.m.shardNum)
+       id, err := partition.ShardID(series.Buffer, t.m.shardNum)
        if err != nil {
-               return nil, nil, 0, err
+               return nil, 0, err
        }
-       return e, entity, common.ShardID(id), nil
+       return series, common.ShardID(id), nil
 }
 
 func (t *topNStreamingProcessor) start() *topNStreamingProcessor {
diff --git a/banyand/measure/measure_write.go b/banyand/measure/measure_write.go
index 94ffbe40..eceaf565 100644
--- a/banyand/measure/measure_write.go
+++ b/banyand/measure/measure_write.go
@@ -25,13 +25,13 @@ import (
        "github.com/apache/skywalking-banyandb/api/common"
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
-       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/banyand/tsdb"
        "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/index"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/partition"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
@@ -49,10 +49,11 @@ func setUpWriteCallback(l *logger.Logger, schemaRepo 
*schemaRepo) bus.MessageLis
 
 func (w *writeCallback) handle(dst map[string]*dataPointsInGroup, writeEvent 
*measurev1.InternalWriteRequest) (map[string]*dataPointsInGroup, error) {
        req := writeEvent.Request
-       t := req.DataPoint.Timestamp.AsTime().Local()
-       if err := timestamp.Check(t); err != nil {
+       tp := req.DataPoint.Timestamp.AsTime().Local()
+       if err := timestamp.Check(tp); err != nil {
                return nil, fmt.Errorf("invalid timestamp: %s", err)
        }
+       t := timestamp.MToN(tp)
        ts := uint64(t.UnixNano())
 
        gn := req.Metadata.Group
@@ -101,9 +102,9 @@ func (w *writeCallback) handle(dst 
map[string]*dataPointsInGroup, writeEvent *me
        if fLen > len(stm.schema.GetTagFamilies()) {
                return nil, fmt.Errorf("%s has more tag families than 
expected", req.Metadata)
        }
-       series, err := tsdb.Register(shardID, &storage.Series{
+       series, err := tsdb.Register(shardID, &pbv1.Series{
                Subject:      req.Metadata.Name,
-               EntityValues: writeEvent.EntityValues,
+               EntityValues: writeEvent.EntityValues[1:],
        })
        if err != nil {
                return nil, fmt.Errorf("cannot register series: %w", err)
@@ -199,7 +200,7 @@ func (w *writeCallback) Rev(message bus.Message) (resp 
bus.Message) {
                g := groups[i]
                for j := range g.tables {
                        dps := g.tables[j]
-                       dps.tsTable.Table().mustAddRows(dps.dataPoints)
+                       dps.tsTable.Table().mustAddDataPoints(dps.dataPoints)
                        dps.tsTable.DecRef()
                }
                g.tsdb.IndexDB().Write(g.docs)
@@ -215,16 +216,16 @@ func encodeFieldValue(name string, fieldValue 
*modelv1.FieldValue) *nameValue {
        nv := &nameValue{name: name}
        switch fieldValue.GetValue().(type) {
        case *modelv1.FieldValue_Int:
-               nv.valueType = storage.ValueTypeInt64
+               nv.valueType = pbv1.ValueTypeInt64
                nv.value = convert.Int64ToBytes(fieldValue.GetInt().GetValue())
        case *modelv1.FieldValue_Float:
-               nv.valueType = storage.ValueTypeFloat64
+               nv.valueType = pbv1.ValueTypeFloat64
                nv.value = 
convert.Float64ToBytes(fieldValue.GetFloat().GetValue())
        case *modelv1.FieldValue_Str:
-               nv.valueType = storage.ValueTypeStr
+               nv.valueType = pbv1.ValueTypeStr
                nv.value = []byte(fieldValue.GetStr().GetValue())
        case *modelv1.FieldValue_BinaryData:
-               nv.valueType = storage.ValueTypeBinaryData
+               nv.valueType = pbv1.ValueTypeBinaryData
                nv.value = bytes.Clone(fieldValue.GetBinaryData())
        }
        return nv
@@ -234,13 +235,13 @@ func encodeTagValue(name string, tagValue 
*modelv1.TagValue) *nameValue {
        nv := &nameValue{name: name}
        switch tagValue.GetValue().(type) {
        case *modelv1.TagValue_Int:
-               nv.valueType = storage.ValueTypeInt64
+               nv.valueType = pbv1.ValueTypeInt64
                nv.value = convert.Int64ToBytes(tagValue.GetInt().GetValue())
        case *modelv1.TagValue_Str:
-               nv.valueType = storage.ValueTypeStr
+               nv.valueType = pbv1.ValueTypeStr
                nv.value = []byte(tagValue.GetStr().GetValue())
        case *modelv1.TagValue_BinaryData:
-               nv.valueType = storage.ValueTypeBinaryData
+               nv.valueType = pbv1.ValueTypeBinaryData
                nv.value = bytes.Clone(tagValue.GetBinaryData())
        case *modelv1.TagValue_IntArray:
                nv.valueArr = make([][]byte, len(tagValue.GetIntArray().Value))
diff --git a/banyand/measure/part.go b/banyand/measure/part.go
index 80085662..dd211449 100644
--- a/banyand/measure/part.go
+++ b/banyand/measure/part.go
@@ -28,6 +28,41 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/fs"
 )
 
+type part struct {
+       partMetadata partMetadata
+
+       primaryBlockMetadata []primaryBlockMetadata
+
+       meta              fs.Reader
+       primary           fs.Reader
+       tagFamilyMetadata map[string]fs.Reader
+       tagFamilies       map[string]fs.Reader
+       timestamps        fs.Reader
+       fieldValues       fs.Reader
+}
+
+func openMemPart(mp *memPart) *part {
+       var p part
+       p.partMetadata = mp.partMetadata
+
+       p.primaryBlockMetadata = 
mustReadPrimaryBlockMetadata(p.primaryBlockMetadata[:0], newReader(&mp.meta))
+
+       // Open data files
+       p.meta = &mp.meta
+       p.primary = &mp.primary
+       p.timestamps = &mp.timestamps
+       p.fieldValues = &mp.fieldValues
+       if mp.tagFamilies != nil {
+               p.tagFamilies = make(map[string]fs.Reader)
+               p.tagFamilyMetadata = make(map[string]fs.Reader)
+               for name, tf := range mp.tagFamilies {
+                       p.tagFamilies[name] = tf
+                       p.tagFamilyMetadata[name] = mp.tagFamilyMetadata[name]
+               }
+       }
+       return &p
+}
+
 type memPart struct {
        partMetadata partMetadata
 
@@ -83,7 +118,7 @@ func (mp *memPart) mustInitFromDataPoints(dps *dataPoints) {
 
        sort.Sort(dps)
 
-       bsw := getBlockWriter()
+       bsw := generateBlockWriter()
        bsw.MustInitForMemPart(mp)
        var sidPrev common.SeriesID
        uncompressedBlockSizeBytes := uint64(0)
@@ -104,7 +139,7 @@ func (mp *memPart) mustInitFromDataPoints(dps *dataPoints) {
        }
        bsw.MustWriteDataPoints(sidPrev, dps.timestamps[indexPrev:], 
dps.tagFamilies[indexPrev:], dps.fields[indexPrev:])
        bsw.Flush(&mp.partMetadata)
-       putBlockStreamWriter(bsw)
+       releaseBlockWriter(bsw)
 }
 
 func uncompressedDataPointSizeBytes(index int, dps *dataPoints) uint64 {
@@ -141,10 +176,11 @@ type partWrapper struct {
        ref int32
 
        mp *memPart
+       p  *part
 }
 
-func newMemPartWrapper(mp *memPart) *partWrapper {
-       return &partWrapper{mp: mp, ref: 1}
+func newMemPartWrapper(mp *memPart, p *part) *partWrapper {
+       return &partWrapper{mp: mp, p: p, ref: 1}
 }
 
 func (pw *partWrapper) incRef() {
diff --git a/banyand/measure/part_iter.go b/banyand/measure/part_iter.go
new file mode 100644
index 00000000..be1e7a9a
--- /dev/null
+++ b/banyand/measure/part_iter.go
@@ -0,0 +1,258 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "fmt"
+       "io"
+       "os"
+       "sort"
+       "strings"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/pkg/bytes"
+       "github.com/apache/skywalking-banyandb/pkg/compress/zstd"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+type partIter struct {
+       bm *blockMetadata
+
+       p *part
+
+       sids []common.SeriesID
+
+       sidIdx int
+
+       minTimestamp int64
+       maxTimestamp int64
+
+       primaryBlockMetadata []primaryBlockMetadata
+
+       bms []blockMetadata
+
+       compressedPrimaryBuf []byte
+       indexBuf             []byte
+
+       err error
+}
+
+func (ps *partIter) reset() {
+       ps.bm = nil
+       ps.p = nil
+       ps.sids = nil
+       ps.sidIdx = 0
+       ps.primaryBlockMetadata = nil
+       ps.bms = nil
+       ps.compressedPrimaryBuf = ps.compressedPrimaryBuf[:0]
+       ps.indexBuf = ps.indexBuf[:0]
+       ps.err = nil
+}
+
+var isInTest = func() bool {
+       return strings.HasSuffix(os.Args[0], ".test")
+}()
+
+func (ps *partIter) init(p *part, sids []common.SeriesID, opt *QueryOptions) {
+       ps.reset()
+       ps.p = p
+
+       ps.sids = sids
+       ps.minTimestamp = opt.minTimestamp
+       ps.maxTimestamp = opt.maxTimestamp
+
+       ps.primaryBlockMetadata = p.primaryBlockMetadata
+
+       ps.nextSeriesID()
+}
+
+func (ps *partIter) NextBlock() bool {
+       for {
+               if ps.err != nil {
+                       return false
+               }
+               if len(ps.bms) == 0 {
+                       if !ps.nextBHS() {
+                               return false
+                       }
+               }
+               if ps.searchBHS() {
+                       return true
+               }
+       }
+}
+
+// Error returns the last error.
+func (ps *partIter) Error() error {
+       if ps.err == io.EOF {
+               return nil
+       }
+       return ps.err
+}
+
+func (ps *partIter) nextSeriesID() bool {
+       if ps.sidIdx >= len(ps.sids) {
+               ps.err = io.EOF
+               return false
+       }
+       ps.bm.seriesID = ps.sids[ps.sidIdx]
+       ps.sidIdx++
+       return true
+}
+
+func (ps *partIter) skipSeriesIDsSmallerThan(sid common.SeriesID) bool {
+       sids := ps.sids[ps.sidIdx:]
+       ps.sidIdx += sort.Search(len(sids), func(i int) bool {
+               return sid < sids[i]
+       })
+       if ps.sidIdx >= len(ps.sids) {
+               ps.sidIdx = len(ps.sids)
+               ps.err = io.EOF
+               return false
+       }
+       ps.bm.seriesID = ps.sids[ps.sidIdx]
+       ps.sidIdx++
+       return true
+}
+
+func (ps *partIter) nextBHS() bool {
+       for len(ps.primaryBlockMetadata) > 0 {
+               if 
!ps.skipSeriesIDsSmallerThan(ps.primaryBlockMetadata[0].seriesID) {
+                       return false
+               }
+               ps.primaryBlockMetadata = 
skipSmallMetaindexRows(ps.primaryBlockMetadata, ps.bm.seriesID)
+
+               mr := &ps.primaryBlockMetadata[0]
+               ps.primaryBlockMetadata = ps.primaryBlockMetadata[1:]
+               if ps.bm.seriesID < mr.seriesID {
+                       logger.Panicf("BUG: invariant violation: 
ps.BlockRef.bh.TSID cannot be smaller than mr.TSID; got %+v vs %+v", 
&ps.bm.seriesID, &mr.seriesID)
+               }
+
+               if mr.maxTimestamp < ps.minTimestamp || mr.minTimestamp > 
ps.maxTimestamp {
+                       continue
+               }
+
+               bm, err := ps.readPrimaryBlock(mr)
+               if err != nil {
+                       ps.err = fmt.Errorf("cannot read index block for part 
%q at offset %d with size %d: %w",
+                               &ps.p.partMetadata, mr.offset, mr.size, err)
+                       return false
+               }
+               ps.bms = bm
+               return true
+       }
+
+       // No more metaindex rows to search.
+       ps.err = io.EOF
+       return false
+}
+
+func skipSmallMetaindexRows(pbmIndex []primaryBlockMetadata, sid 
common.SeriesID) []primaryBlockMetadata {
+       if sid < pbmIndex[0].seriesID {
+               logger.Panicf("BUG: invariant violation: tsid cannot be smaller 
than metaindex[0]; got %d vs %d", sid, &pbmIndex[0].seriesID)
+       }
+
+       if sid == pbmIndex[0].seriesID {
+               return pbmIndex
+       }
+
+       n := sort.Search(len(pbmIndex), func(i int) bool {
+               return sid > pbmIndex[i].seriesID
+       })
+       if n == 0 {
+               logger.Panicf("BUG: invariant violation: sort.Search returned 0 
for tsid > metaindex[0].TSID; tsid=%+v; metaindex[0].TSID=%+v",
+                       sid, &pbmIndex[0].seriesID)
+       }
+       return pbmIndex[n-1:]
+}
+
+func (ps *partIter) readPrimaryBlock(mr *primaryBlockMetadata) 
([]blockMetadata, error) {
+       ps.compressedPrimaryBuf = bytes.ResizeOver(ps.compressedPrimaryBuf, 
int(mr.size))
+       fs.MustReadData(ps.p.primary, int64(mr.offset), ps.compressedPrimaryBuf)
+
+       var err error
+       ps.indexBuf, err = zstd.Decompress(ps.indexBuf[:0], 
ps.compressedPrimaryBuf)
+       if err != nil {
+               return nil, fmt.Errorf("cannot decompress index block: %w", err)
+       }
+       //TODO: cache bm
+       bm := make([]blockMetadata, 0)
+       bm, err = unmarshalBlockMetadata(bm, ps.indexBuf)
+       if err != nil {
+               return nil, fmt.Errorf("cannot unmarshal index block: %w", err)
+       }
+       return bm, nil
+}
+
+func (ps *partIter) searchBHS() bool {
+       bhs := ps.bms
+       for len(bhs) > 0 {
+               // Skip block headers with tsids smaller than the given sid.
+               sid := ps.bm.seriesID
+               if bhs[0].seriesID < sid {
+                       n := sort.Search(len(bhs), func(i int) bool {
+                               return sid > bhs[i].seriesID
+                       })
+                       if n == len(bhs) {
+                               // Nothing found.
+                               break
+                       }
+                       bhs = bhs[n:]
+               }
+               bh := &bhs[0]
+
+               // Invariant: tsid <= bh.TSID
+
+               if bh.seriesID != sid {
+                       // tsid < bh.TSID: no more blocks with the given tsid.
+                       // Proceed to the next (bigger) tsid.
+                       if !ps.skipSeriesIDsSmallerThan(bh.seriesID) {
+                               return false
+                       }
+                       continue
+               }
+
+               // Found the block with the given tsid. Verify timestamp range.
+               // While blocks for the same TSID are sorted by MinTimestamp,
+               // the may contain overlapped time ranges.
+               // So use linear search instead of binary search.
+               if bh.timestamps.max < ps.minTimestamp {
+                       // Skip the block with too small timestamps.
+                       bhs = bhs[1:]
+                       continue
+               }
+               if bh.timestamps.min > ps.maxTimestamp {
+                       // Proceed to the next tsid, since the remaining blocks
+                       // for the current tsid contain too big timestamps.
+                       if !ps.nextSeriesID() {
+                               return false
+                       }
+                       continue
+               }
+
+               // Found the tsid block with the matching timestamp range.
+               // Read it.
+               ps.bm = bh
+
+               ps.bms = bhs[1:]
+               return true
+       }
+       ps.bms = nil
+       return false
+}
diff --git a/banyand/measure/primary_metadata.go 
b/banyand/measure/primary_metadata.go
index 1cb5cac6..4063caf2 100644
--- a/banyand/measure/primary_metadata.go
+++ b/banyand/measure/primary_metadata.go
@@ -18,9 +18,13 @@
 package measure
 
 import (
+       "fmt"
+       "io"
+
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/compress/zstd"
        "github.com/apache/skywalking-banyandb/pkg/encoding"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
 type primaryBlockMetadata struct {
@@ -52,7 +56,6 @@ func (ph *primaryBlockMetadata) mustWriteBlock(data []byte, 
sidFirst common.Seri
        longTermBufPool.Put(bb)
 }
 
-// marshal appends marshaled ih to dst and returns the result.
 func (ph *primaryBlockMetadata) marshal(dst []byte) []byte {
        dst = ph.seriesID.AppendToBytes(dst)
        dst = encoding.Uint64ToBytes(dst, uint64(ph.minTimestamp))
@@ -61,3 +64,69 @@ func (ph *primaryBlockMetadata) marshal(dst []byte) []byte {
        dst = encoding.Uint64ToBytes(dst, ph.size)
        return dst
 }
+
+func (ph *primaryBlockMetadata) unmarshal(src []byte) ([]byte, error) {
+       if len(src) < 40 {
+               return nil, fmt.Errorf("cannot unmarshal primaryBlockMetadata 
from %d bytes; expect at least 40 bytes", len(src))
+       }
+       ph.seriesID = common.SeriesID(encoding.BytesToUint64(src))
+       src = src[8:]
+       ph.minTimestamp = int64(encoding.BytesToUint64(src))
+       src = src[8:]
+       ph.maxTimestamp = int64(encoding.BytesToUint64(src))
+       src = src[8:]
+       ph.offset = encoding.BytesToUint64(src)
+       src = src[8:]
+       ph.size = encoding.BytesToUint64(src)
+       return src[8:], nil
+}
+
+func mustReadPrimaryBlockMetadata(dst []primaryBlockMetadata, r *reader) 
[]primaryBlockMetadata {
+       data, err := io.ReadAll(r)
+       if err != nil {
+               logger.Panicf("cannot read primaryBlockMetadata entries from 
%s: %s", r.Path(), err)
+       }
+
+       bb := longTermBufPool.Get()
+       bb.Buf, err = zstd.Decompress(bb.Buf[:0], data)
+       if err != nil {
+               logger.Panicf("cannot decompress indexBlockHeader entries from 
%s: %s", r.Path(), err)
+       }
+       dst, err = unmarshalPrimaryBlockMetadata(dst, bb.Buf)
+       longTermBufPool.Put(bb)
+       if err != nil {
+               logger.Panicf("cannot parse indexBlockHeader entries from %s: 
%s", r.Path(), err)
+       }
+       return dst
+}
+
+// unmarshalPrimaryBlockMetadata appends unmarshaled from src indexBlockHeader 
entries to dst and returns the result.
+func unmarshalPrimaryBlockMetadata(dst []primaryBlockMetadata, src []byte) 
([]primaryBlockMetadata, error) {
+       dstOrig := dst
+       for len(src) > 0 {
+               if len(dst) < cap(dst) {
+                       dst = dst[:len(dst)+1]
+               } else {
+                       dst = append(dst, primaryBlockMetadata{})
+               }
+               ih := &dst[len(dst)-1]
+               tail, err := ih.unmarshal(src)
+               if err != nil {
+                       return dstOrig, fmt.Errorf("cannot unmarshal 
indexBlockHeader %d: %w", len(dst)-len(dstOrig), err)
+               }
+               src = tail
+       }
+       if err := validatePrimaryBlockMetadata(dst[len(dstOrig):]); err != nil {
+               return dstOrig, err
+       }
+       return dst, nil
+}
+
+func validatePrimaryBlockMetadata(ihs []primaryBlockMetadata) error {
+       for i := 1; i < len(ihs); i++ {
+               if ihs[i].seriesID < ihs[i-1].seriesID {
+                       return fmt.Errorf("unexpected primaryBlockMetadata with 
smaller seriesID=%s after bigger seriesID=%s", &ihs[i].seriesID, 
&ihs[i-1].seriesID)
+               }
+       }
+       return nil
+}
diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go
index 5906742d..35b96d02 100644
--- a/banyand/measure/tstable.go
+++ b/banyand/measure/tstable.go
@@ -18,6 +18,7 @@
 package measure
 
 import (
+       "container/heap"
        "fmt"
        "io"
        "path"
@@ -229,17 +230,171 @@ func (tst *tsTable) Close() error {
        panic("implement me")
 }
 
-func (tst *tsTable) mustAddRows(dps *dataPoints) {
+func (tst *tsTable) mustAddDataPoints(dps *dataPoints) {
        if len(dps.seriesIDs) == 0 {
                return
        }
 
        mp := getMemPart()
        mp.mustInitFromDataPoints(dps)
+       p := openMemPart(mp)
 
-       pw := newMemPartWrapper(mp)
+       pw := newMemPartWrapper(mp, p)
 
        tst.Lock()
        defer tst.Unlock()
        tst.memParts = append(tst.memParts, pw)
 }
+
+func (tst *tsTable) getParts(dst []*partWrapper, dstPart []*part, opts 
*QueryOptions) ([]*partWrapper, []*part) {
+       tst.RLock()
+       defer tst.RUnlock()
+       for _, p := range tst.memParts {
+               pm := p.mp.partMetadata
+               if opts.maxTimestamp < pm.MinTimestamp || opts.minTimestamp > 
pm.MaxTimestamp {
+                       continue
+               }
+               p.incRef()
+               dst = append(dst, p)
+               dstPart = append(dstPart, p.p)
+       }
+       return dst, dstPart
+}
+
+type tstIter struct {
+       queryOpts *QueryOptions
+
+       parts []*part
+
+       piPool []partIter
+       piHeap partIterHeap
+
+       err error
+
+       nextBlockNoop bool
+}
+
+func (ti *tstIter) reset() {
+       for i := range ti.parts {
+               ti.parts[i] = nil
+       }
+       ti.parts = ti.parts[:0]
+
+       for i := range ti.piPool {
+               ti.piPool[i].reset()
+       }
+       ti.piPool = ti.piPool[:0]
+
+       for i := range ti.piHeap {
+               ti.piHeap[i] = nil
+       }
+       ti.piHeap = ti.piHeap[:0]
+
+       ti.err = nil
+       ti.nextBlockNoop = false
+}
+
+func (ti *tstIter) init(parts []*part, sids []common.SeriesID, queryOpts 
*QueryOptions) {
+       ti.reset()
+       ti.parts = parts
+       ti.queryOpts = queryOpts
+
+       if n := len(ti.parts) - cap(ti.piPool); n > 0 {
+               ti.piPool = append(ti.piPool[:cap(ti.piPool)], make([]partIter, 
n)...)
+       }
+       ti.piPool = ti.piPool[:len(ti.parts)]
+       for i, p := range ti.parts {
+               ti.piPool[i].init(p, sids, queryOpts)
+       }
+
+       ti.piHeap = ti.piHeap[:0]
+       for i := range ti.piPool {
+               ps := &ti.piPool[i]
+               if !ps.NextBlock() {
+                       if err := ps.Error(); err != nil {
+                               ti.err = fmt.Errorf("cannot initialize tstable 
iteration: %w", err)
+                               return
+                       }
+                       continue
+               }
+               ti.piHeap = append(ti.piHeap, ps)
+       }
+       if len(ti.piHeap) == 0 {
+               ti.err = io.EOF
+               return
+       }
+       heap.Init(&ti.piHeap)
+       ti.nextBlockNoop = true
+}
+
+func (ti *tstIter) NextBlock() bool {
+       if ti.err != nil {
+               return false
+       }
+       if ti.nextBlockNoop {
+               ti.nextBlockNoop = false
+               return true
+       }
+
+       ti.err = ti.nextBlock()
+       if ti.err != nil {
+               if ti.err != io.EOF {
+                       ti.err = fmt.Errorf("cannot obtain the next block to 
search in the partition: %w", ti.err)
+               }
+               return false
+       }
+       return true
+}
+
+func (ti *tstIter) nextBlock() error {
+       psMin := ti.piHeap[0]
+       if psMin.NextBlock() {
+               heap.Fix(&ti.piHeap, 0)
+               return nil
+       }
+
+       if err := psMin.Error(); err != nil {
+               return err
+       }
+
+       heap.Pop(&ti.piHeap)
+
+       if len(ti.piHeap) == 0 {
+               return io.EOF
+       }
+       return nil
+}
+
+func (ti *tstIter) Error() error {
+       if ti.err == io.EOF {
+               return nil
+       }
+       return ti.err
+}
+
+type partIterHeap []*partIter
+
+func (pih *partIterHeap) Len() int {
+       return len(*pih)
+}
+
+func (pih *partIterHeap) Less(i, j int) bool {
+       x := *pih
+       return x[i].bm.less(x[j].bm)
+}
+
+func (pih *partIterHeap) Swap(i, j int) {
+       x := *pih
+       x[i], x[j] = x[j], x[i]
+}
+
+func (pih *partIterHeap) Push(x any) {
+       *pih = append(*pih, x.(*partIter))
+}
+
+func (pih *partIterHeap) Pop() any {
+       a := *pih
+       v := a[len(a)-1]
+       *pih = a[:len(a)-1]
+       return v
+}
diff --git a/pkg/bytes/buffer.go b/pkg/bytes/buffer.go
index 369a9d62..8e6ffa9e 100644
--- a/pkg/bytes/buffer.go
+++ b/pkg/bytes/buffer.go
@@ -24,7 +24,10 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/fs"
 )
 
-var _ fs.Writer = (*Buffer)(nil)
+var (
+       _ fs.Writer = (*Buffer)(nil)
+       _ fs.Reader = (*Buffer)(nil)
+)
 
 type Buffer struct {
        Buf []byte
@@ -46,6 +49,11 @@ func (b *Buffer) Write(bb []byte) (int, error) {
        return len(bb), nil
 }
 
+// Read implements fs.Reader.
+func (b *Buffer) Read(offset int64, buffer []byte) (int, error) {
+       return copy(buffer, b.Buf[offset:]), nil
+}
+
 func (b *Buffer) Reset() {
        b.Buf = b.Buf[:0]
 }
diff --git a/pkg/bytes/buffer.go b/pkg/bytes/resize.go
similarity index 53%
copy from pkg/bytes/buffer.go
copy to pkg/bytes/resize.go
index 369a9d62..24a0a975 100644
--- a/pkg/bytes/buffer.go
+++ b/pkg/bytes/resize.go
@@ -17,52 +17,25 @@
 
 package bytes
 
-import (
-       "fmt"
-       "sync"
+import "math/bits"
 
-       "github.com/apache/skywalking-banyandb/pkg/fs"
-)
-
-var _ fs.Writer = (*Buffer)(nil)
-
-type Buffer struct {
-       Buf []byte
-}
-
-// Close implements fs.Writer.
-func (*Buffer) Close() error {
-       return nil
-}
-
-// Path implements fs.Writer.
-func (b *Buffer) Path() string {
-       return fmt.Sprintf("mem/%p", b)
-}
-
-// Write implements fs.Writer.
-func (b *Buffer) Write(bb []byte) (int, error) {
-       b.Buf = append(b.Buf, bb...)
-       return len(bb), nil
-}
-
-func (b *Buffer) Reset() {
-       b.Buf = b.Buf[:0]
-}
-
-type BufferPool struct {
-       p sync.Pool
+func ResizeOver(b []byte, n int) []byte {
+       if n <= cap(b) {
+               return b[:n]
+       }
+       nNew := roundToNearestPow2(n)
+       bNew := make([]byte, nNew)
+       return bNew[:n]
 }
 
-func (bp *BufferPool) Get() *Buffer {
-       bbv := bp.p.Get()
-       if bbv == nil {
-               return &Buffer{}
+func ResizeExact(b []byte, n int) []byte {
+       if n <= cap(b) {
+               return b[:n]
        }
-       return bbv.(*Buffer)
+       return make([]byte, n)
 }
 
-func (bp *BufferPool) Put(b *Buffer) {
-       b.Reset()
-       bp.p.Put(b)
+func roundToNearestPow2(n int) int {
+       pow2 := uint8(bits.Len(uint(n - 1)))
+       return 1 << pow2
 }
diff --git a/pkg/fs/file_system.go b/pkg/fs/file_system.go
index 2010e26e..08837991 100644
--- a/pkg/fs/file_system.go
+++ b/pkg/fs/file_system.go
@@ -45,6 +45,15 @@ type Writer interface {
        Close() error
 }
 
+type Reader interface {
+       // Read the entire file using streaming read.
+       Read(offset int64, buffer []byte) (int, error)
+       // Returns the absolute path of the file.
+       Path() string
+       // Close File.
+       Close() error
+}
+
 type Closer interface {
        // Returns the absolute path of the file.
        Path() string
@@ -55,10 +64,9 @@ type Closer interface {
 // File operation interface.
 type File interface {
        Writer
+       Reader
        // Vector Append mode, which supports appending consecutive buffers to 
the end of the file.
        Writev(iov *[][]byte) (int, error)
-       // Reading a specified location of file.
-       Read(offset int64, buffer []byte) (int, error)
        // Reading contiguous regions of a file and dispersing them into 
discontinuous buffers.
        Readv(offset int64, iov *[][]byte) (int, error)
        // Read the entire file using streaming read.
@@ -114,6 +122,17 @@ func MustWriteData(w Writer, data []byte) {
        }
 }
 
+// MustReadData reads data from r and panics if it cannot read all data.
+func MustReadData(r Reader, offset int64, buff []byte) {
+       n, err := r.Read(offset, buff)
+       if err != nil {
+               logger.GetLogger().Panic().Err(err).Str("path", 
r.Path()).Msg("cannot read data")
+       }
+       if n != len(buff) {
+               logger.GetLogger().Panic().Int("read", n).Int("expected", 
len(buff)).Str("path", r.Path()).Msg("BUG: reader read wrong number of bytes")
+       }
+}
+
 func MustClose(c Closer) {
        err := c.Close()
        if err != nil {
diff --git a/pkg/pb/v1/metadata.go b/pkg/pb/v1/metadata.go
index 22382972..a0055009 100644
--- a/pkg/pb/v1/metadata.go
+++ b/pkg/pb/v1/metadata.go
@@ -21,6 +21,8 @@ package v1
 import (
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/pkg/index"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 // FindTagByName finds TagSpec in several tag families by its name.
@@ -70,3 +72,53 @@ func FieldValueTypeConv(fieldValue *modelv1.FieldValue) 
(tagType databasev1.Fiel
        }
        return databasev1.FieldType_FIELD_TYPE_UNSPECIFIED, false
 }
+
+type OrderBy struct {
+       Index *databasev1.IndexRule
+       Sort  modelv1.Sort
+}
+
+// AnyEntry is the `*` for a regular expression. It could match "any" Entry in 
an Entity.
+var anyEntry = &modelv1.TagValue_Null{}
+var AnyTagValue = &modelv1.TagValue{Value: anyEntry}
+
+type Tag struct {
+       Name   string
+       Values []*modelv1.TagValue
+}
+
+type TagFamily struct {
+       Name string
+       Tags []Tag
+}
+
+type Field struct {
+       Name   string
+       Values []*modelv1.FieldValue
+}
+
+type Result struct {
+       Timestamps  []int64
+       TagFamilies []TagFamily
+       Fields      []Field
+}
+
+type TagProjection struct {
+       Family string
+       Name   string
+}
+
+type MeasureQueryOptions struct {
+       Name            string
+       TimeRange       *timestamp.TimeRange
+       Entity          []*modelv1.TagValue
+       Filter          index.Filter
+       Order           *OrderBy
+       TagProjection   []TagProjection
+       FieldProjection []string
+}
+
+type MeasureQueryResult interface {
+       Pull() *Result
+       Release()
+}
diff --git a/banyand/internal/storage/series.go b/pkg/pb/v1/series.go
similarity index 70%
rename from banyand/internal/storage/series.go
rename to pkg/pb/v1/series.go
index 19bf4797..072e3290 100644
--- a/banyand/internal/storage/series.go
+++ b/pkg/pb/v1/series.go
@@ -15,10 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package storage
+package v1
 
 import (
-       "bytes"
        "sort"
        "sync"
 
@@ -38,7 +37,7 @@ type Series struct {
        ID           common.SeriesID
 }
 
-func (s *Series) marshal() error {
+func (s *Series) Marshal() error {
        s.Buffer = marshalEntityValue(s.Buffer, 
convert.StringToBytes(s.Subject))
        var err error
        for _, tv := range s.EntityValues {
@@ -50,7 +49,7 @@ func (s *Series) marshal() error {
        return nil
 }
 
-func (s *Series) unmarshal(src []byte) error {
+func (s *Series) Unmarshal(src []byte) error {
        var err error
        s.Buffer = s.Buffer[:0]
        if s.Buffer, src, err = unmarshalEntityValue(s.Buffer, src); err != nil 
{
@@ -92,60 +91,6 @@ func (sp *SeriesPool) Put(s *Series) {
        sp.pool.Put(s)
 }
 
-// AnyEntry is the `*` for a regular expression. It could match "any" Entry in 
an Entity.
-var AnyEntry = &modelv1.TagValue_Null{}
-
-const (
-       entityDelimiter = '|'
-       escape          = '\\'
-)
-
-var anyWildcard = []byte{'*'}
-
-func marshalEntityValue(dest, src []byte) []byte {
-       if src == nil {
-               dest = append(dest, entityDelimiter)
-               return dest
-       }
-       if bytes.IndexByte(src, entityDelimiter) < 0 && bytes.IndexByte(src, 
escape) < 0 {
-               dest = append(dest, src...)
-               dest = append(dest, entityDelimiter)
-               return dest
-       }
-       for _, b := range src {
-               if b == entityDelimiter || b == escape {
-                       dest = append(dest, escape)
-               }
-               dest = append(dest, b)
-       }
-       dest = append(dest, entityDelimiter)
-       return dest
-}
-
-func unmarshalEntityValue(dest, src []byte) ([]byte, []byte, error) {
-       if len(src) == 0 {
-               return nil, nil, errors.New("empty entity value")
-       }
-       if src[0] == entityDelimiter {
-               return dest, src[1:], nil
-       }
-       for len(src) > 0 {
-               if src[0] == escape {
-                       if len(src) < 2 {
-                               return nil, nil, errors.New("invalid escape 
character")
-                       }
-                       src = src[1:]
-                       dest = append(dest, src[0])
-               } else if src[0] == entityDelimiter {
-                       return dest, src[1:], nil
-               } else {
-                       dest = append(dest, src[0])
-               }
-               src = src[1:]
-       }
-       return nil, nil, errors.New("invalid entity value")
-}
-
 // SeriesList is a collection of Series.
 type SeriesList []*Series
 
@@ -195,7 +140,7 @@ func (a SeriesList) Merge(other SeriesList) SeriesList {
        return final
 }
 
-func (a SeriesList) toList() posting.List {
+func (a SeriesList) ToList() posting.List {
        pl := roaring.NewPostingList()
        for _, v := range a {
                pl.Insert(uint64(v.ID))
diff --git a/banyand/internal/storage/series_test.go b/pkg/pb/v1/series_test.go
similarity index 97%
rename from banyand/internal/storage/series_test.go
rename to pkg/pb/v1/series_test.go
index f4a11e3c..c44c5a89 100644
--- a/banyand/internal/storage/series_test.go
+++ b/pkg/pb/v1/series_test.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package storage
+package v1
 
 import (
        "testing"
@@ -97,7 +97,7 @@ func TestMarshalAndUnmarshalSeries(t *testing.T) {
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
                        // Test Series.Marshal
-                       err := tt.src.marshal()
+                       err := tt.src.Marshal()
 
                        // Add assertions
                        assert.NoError(t, err)
@@ -107,7 +107,7 @@ func TestMarshalAndUnmarshalSeries(t *testing.T) {
 
                        // Test Series.Unmarshal
                        tt.src.reset()
-                       err = tt.src.unmarshal(marshaled)
+                       err = tt.src.Unmarshal(marshaled)
 
                        // Add assertions
                        assert.NoError(t, err)
diff --git a/banyand/internal/storage/value.go b/pkg/pb/v1/value.go
similarity index 63%
rename from banyand/internal/storage/value.go
rename to pkg/pb/v1/value.go
index 181f2a3d..d25dde07 100644
--- a/banyand/internal/storage/value.go
+++ b/pkg/pb/v1/value.go
@@ -15,12 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package storage
+package v1
 
 import (
+       "bytes"
+
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/encoding"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/pkg/errors"
 )
 
@@ -54,7 +57,7 @@ func MustTagValueToValueType(tag *modelv1.TagValue) ValueType 
{
 }
 
 func marshalTagValue(dest []byte, tv *modelv1.TagValue) ([]byte, error) {
-       if tv.Value == AnyEntry {
+       if tv == AnyTagValue {
                dest = marshalEntityValue(dest, anyWildcard)
                return dest, nil
        }
@@ -116,3 +119,88 @@ func unmarshalTagValue(dest []byte, src []byte) ([]byte, 
[]byte, *modelv1.TagVal
        }
        return nil, nil, nil, errors.New("unsupported tag value type")
 }
+
+const (
+       entityDelimiter = '|'
+       escape          = '\\'
+)
+
+var anyWildcard = []byte{'*'}
+
+func marshalEntityValue(dest, src []byte) []byte {
+       if src == nil {
+               dest = append(dest, entityDelimiter)
+               return dest
+       }
+       if bytes.IndexByte(src, entityDelimiter) < 0 && bytes.IndexByte(src, 
escape) < 0 {
+               dest = append(dest, src...)
+               dest = append(dest, entityDelimiter)
+               return dest
+       }
+       for _, b := range src {
+               if b == entityDelimiter || b == escape {
+                       dest = append(dest, escape)
+               }
+               dest = append(dest, b)
+       }
+       dest = append(dest, entityDelimiter)
+       return dest
+}
+
+func unmarshalEntityValue(dest, src []byte) ([]byte, []byte, error) {
+       if len(src) == 0 {
+               return nil, nil, errors.New("empty entity value")
+       }
+       if src[0] == entityDelimiter {
+               return dest, src[1:], nil
+       }
+       for len(src) > 0 {
+               if src[0] == escape {
+                       if len(src) < 2 {
+                               return nil, nil, errors.New("invalid escape 
character")
+                       }
+                       src = src[1:]
+                       dest = append(dest, src[0])
+               } else if src[0] == entityDelimiter {
+                       return dest, src[1:], nil
+               } else {
+                       dest = append(dest, src[0])
+               }
+               src = src[1:]
+       }
+       return nil, nil, errors.New("invalid entity value")
+}
+
+func MustCompareTagValue(tv1, tv2 *modelv1.TagValue) int {
+       if tv1 == nil && tv2 == nil {
+               return 0
+       }
+       if tv1 == nil {
+               return -1
+       }
+       if tv2 == nil {
+               return 1
+       }
+       if tv1 == AnyTagValue {
+               return 1
+       }
+       if tv2 == AnyTagValue {
+               return -1
+       }
+       vt1 := MustTagValueToValueType(tv1)
+       vt2 := MustTagValueToValueType(tv2)
+       if vt1 != vt2 {
+               logger.Panicf("inconsistent tag value type: %v vs %v", vt1, vt2)
+       }
+       switch vt1 {
+       case ValueTypeStr:
+               return bytes.Compare(convert.StringToBytes(tv1.GetStr().Value), 
convert.StringToBytes(tv2.GetStr().Value))
+       case ValueTypeInt64:
+               return int(tv1.GetInt().Value - tv2.GetInt().Value)
+       case ValueTypeBinaryData:
+               return bytes.Compare(tv1.GetBinaryData(), tv2.GetBinaryData())
+       default:
+               logger.Panicf("unsupported tag value type: %v", vt1)
+               return 0
+       }
+}
diff --git a/banyand/internal/storage/value_test.go b/pkg/pb/v1/value_test.go
similarity index 99%
rename from banyand/internal/storage/value_test.go
rename to pkg/pb/v1/value_test.go
index 950cb99f..17f77fed 100644
--- a/banyand/internal/storage/value_test.go
+++ b/pkg/pb/v1/value_test.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package storage
+package v1
 
 import (
        "testing"
diff --git a/pkg/query/executor/interface.go b/pkg/query/executor/interface.go
index a206293b..1c5eeb82 100644
--- a/pkg/query/executor/interface.go
+++ b/pkg/query/executor/interface.go
@@ -27,6 +27,7 @@ import (
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
        "github.com/apache/skywalking-banyandb/banyand/tsdb"
        "github.com/apache/skywalking-banyandb/pkg/bus"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
 )
 
 // ExecutionContext allows retrieving data from tsdb.
@@ -64,8 +65,7 @@ type StreamExecutable interface {
 
 // MeasureExecutionContext allows retrieving data through the measure module.
 type MeasureExecutionContext interface {
-       ExecutionContext
-       ParseField(name string, item tsdb.Item) (*measurev1.DataPoint_Field, 
error)
+       Query(ctx context.Context, opts pbv1.MeasureQueryOptions) 
(pbv1.MeasureQueryResult, error)
 }
 
 // MeasureExecutionContextKey is the key of measure execution context in 
context.Context.
diff --git a/pkg/query/logical/common.go b/pkg/query/logical/common.go
index 9d197df4..6c3eb7e0 100644
--- a/pkg/query/logical/common.go
+++ b/pkg/query/logical/common.go
@@ -59,7 +59,7 @@ func ProjectItem(ec executor.ExecutionContext, item 
tsdb.Item, projectionFieldRe
                if len(refs) == 0 {
                        continue
                }
-               familyName := refs[0].Tag.getFamilyName()
+               familyName := refs[0].Tag.GetFamilyName()
                parsedTagFamily, err := ec.ParseTagFamily(familyName, item)
                if err != nil {
                        return nil, errors.WithMessage(err, "parse projection")
@@ -173,11 +173,11 @@ func (t *Tag) GetCompoundName() string {
        return t.familyName + ":" + t.name
 }
 
-func (t *Tag) getTagName() string {
+func (t *Tag) GetTagName() string {
        return t.name
 }
 
-func (t *Tag) getFamilyName() string {
+func (t *Tag) GetFamilyName() string {
        return t.familyName
 }
 
diff --git a/pkg/query/logical/expr.go b/pkg/query/logical/expr.go
index b750dcde..ed0bdeda 100644
--- a/pkg/query/logical/expr.go
+++ b/pkg/query/logical/expr.go
@@ -37,7 +37,7 @@ type TagRef struct {
 // Equal reports whether f and expr have the same name and data type.
 func (f *TagRef) Equal(expr Expr) bool {
        if other, ok := expr.(*TagRef); ok {
-               return other.Tag.getTagName() == f.Tag.getTagName() && 
other.Spec.Spec.GetType() == f.Spec.Spec.GetType()
+               return other.Tag.GetTagName() == f.Tag.GetTagName() && 
other.Spec.Spec.GetType() == f.Spec.Spec.GetType()
        }
        return false
 }
diff --git a/pkg/query/logical/index_filter.go 
b/pkg/query/logical/index_filter.go
index 4ab9ff01..78a210c5 100644
--- a/pkg/query/logical/index_filter.go
+++ b/pkg/query/logical/index_filter.go
@@ -32,6 +32,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/index"
        "github.com/apache/skywalking-banyandb/pkg/index/posting"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
 )
 
 var errInvalidLogicalExpression = errors.New("invalid logical expression")
@@ -45,9 +46,9 @@ type GlobalIndexError struct {
 
 func (g GlobalIndexError) Error() string { return g.IndexRule.String() }
 
-// BuildLocalFilter returns a new index.Filter for local indices.
+// BuildLocalFilterDeprecated returns a new index.Filter for local indices.
 // It could parse series Path at the same time.
-func BuildLocalFilter(criteria *modelv1.Criteria, schema Schema, entityDict 
map[string]int,
+func BuildLocalFilterDeprecated(criteria *modelv1.Criteria, schema Schema, 
entityDict map[string]int,
        entity tsdb.Entity, mandatoryIndexRule bool,
 ) (index.Filter, []tsdb.Entity, error) {
        if criteria == nil {
@@ -56,7 +57,7 @@ func BuildLocalFilter(criteria *modelv1.Criteria, schema 
Schema, entityDict map[
        switch criteria.GetExp().(type) {
        case *modelv1.Criteria_Condition:
                cond := criteria.GetCondition()
-               expr, parsedEntity, err := parseExprOrEntity(entityDict, 
entity, cond)
+               expr, parsedEntity, err := 
parseExprOrEntityDeprecated(entityDict, entity, cond)
                if err != nil {
                        return nil, nil, err
                }
@@ -75,7 +76,7 @@ func BuildLocalFilter(criteria *modelv1.Criteria, schema 
Schema, entityDict map[
                                        return nil, nil, 
errors.Wrapf(errUnsupportedConditionOp, "gobal index conf:%s", cond)
                                }
                        }
-                       return parseCondition(cond, indexRule, expr, entity)
+                       return parseConditionDeprecated(cond, indexRule, expr, 
entity)
                } else if mandatoryIndexRule {
                        return nil, nil, 
errors.Wrapf(errUnsupportedConditionOp, "mandatory index rule conf:%s", cond)
                }
@@ -86,20 +87,20 @@ func BuildLocalFilter(criteria *modelv1.Criteria, schema 
Schema, entityDict map[
                        return nil, nil, 
errors.WithMessagef(errInvalidLogicalExpression, "both sides(left and right) of 
[%v] are empty", criteria)
                }
                if le.GetLeft() == nil {
-                       return BuildLocalFilter(le.Right, schema, entityDict, 
entity, mandatoryIndexRule)
+                       return BuildLocalFilterDeprecated(le.Right, schema, 
entityDict, entity, mandatoryIndexRule)
                }
                if le.GetRight() == nil {
-                       return BuildLocalFilter(le.Left, schema, entityDict, 
entity, mandatoryIndexRule)
+                       return BuildLocalFilterDeprecated(le.Left, schema, 
entityDict, entity, mandatoryIndexRule)
                }
-               left, leftEntities, err := BuildLocalFilter(le.Left, schema, 
entityDict, entity, mandatoryIndexRule)
+               left, leftEntities, err := BuildLocalFilterDeprecated(le.Left, 
schema, entityDict, entity, mandatoryIndexRule)
                if err != nil {
                        return nil, nil, err
                }
-               right, rightEntities, err := BuildLocalFilter(le.Right, schema, 
entityDict, entity, mandatoryIndexRule)
+               right, rightEntities, err := 
BuildLocalFilterDeprecated(le.Right, schema, entityDict, entity, 
mandatoryIndexRule)
                if err != nil {
                        return nil, nil, err
                }
-               entities := parseEntities(le.Op, entity, leftEntities, 
rightEntities)
+               entities := parseEntitiesDeprecated(le.Op, entity, 
leftEntities, rightEntities)
                if entities == nil {
                        return nil, nil, nil
                }
@@ -120,7 +121,7 @@ func BuildLocalFilter(criteria *modelv1.Criteria, schema 
Schema, entityDict map[
        return nil, nil, errInvalidCriteriaType
 }
 
-func parseCondition(cond *modelv1.Condition, indexRule *databasev1.IndexRule, 
expr LiteralExpr, entity tsdb.Entity) (index.Filter, []tsdb.Entity, error) {
+func parseConditionDeprecated(cond *modelv1.Condition, indexRule 
*databasev1.IndexRule, expr LiteralExpr, entity tsdb.Entity) (index.Filter, 
[]tsdb.Entity, error) {
        switch cond.Op {
        case modelv1.Condition_BINARY_OP_GT:
                return newRange(indexRule, index.RangeOpts{
@@ -178,7 +179,7 @@ func parseCondition(cond *modelv1.Condition, indexRule 
*databasev1.IndexRule, ex
        return nil, nil, errors.WithMessagef(errUnsupportedConditionOp, "index 
filter parses %v", cond)
 }
 
-func parseExprOrEntity(entityDict map[string]int, entity tsdb.Entity, cond 
*modelv1.Condition) (LiteralExpr, []tsdb.Entity, error) {
+func parseExprOrEntityDeprecated(entityDict map[string]int, entity 
tsdb.Entity, cond *modelv1.Condition) (LiteralExpr, []tsdb.Entity, error) {
        entityIdx, ok := entityDict[cond.Name]
        if ok && cond.Op != modelv1.Condition_BINARY_OP_EQ && cond.Op != 
modelv1.Condition_BINARY_OP_IN {
                return nil, nil, errors.WithMessagef(errUnsupportedConditionOp, 
"tag belongs to the entity only supports EQ or IN operation in condition(%v)", 
cond)
@@ -236,7 +237,7 @@ func parseExprOrEntity(entityDict map[string]int, entity 
tsdb.Entity, cond *mode
        return nil, nil, errors.WithMessagef(errUnsupportedConditionValue, 
"index filter parses %v", cond)
 }
 
-func parseEntities(op modelv1.LogicalExpression_LogicalOp, input tsdb.Entity, 
left, right []tsdb.Entity) []tsdb.Entity {
+func parseEntitiesDeprecated(op modelv1.LogicalExpression_LogicalOp, input 
tsdb.Entity, left, right []tsdb.Entity) []tsdb.Entity {
        count := len(input)
        result := make(tsdb.Entity, count)
        anyEntity := func(entities []tsdb.Entity) bool {
@@ -304,6 +305,275 @@ func parseEntities(op 
modelv1.LogicalExpression_LogicalOp, input tsdb.Entity, le
        return []tsdb.Entity{result}
 }
 
+func BuildLocalFilter(criteria *modelv1.Criteria, schema Schema, entityDict 
map[string]int,
+       entity []*modelv1.TagValue, mandatoryIndexRule bool,
+) (index.Filter, [][]*modelv1.TagValue, error) {
+       if criteria == nil {
+               return nil, [][]*modelv1.TagValue{entity}, nil
+       }
+       switch criteria.GetExp().(type) {
+       case *modelv1.Criteria_Condition:
+               cond := criteria.GetCondition()
+               expr, parsedEntity, err := parseExprOrEntity(entityDict, 
entity, cond)
+               if err != nil {
+                       return nil, nil, err
+               }
+               if parsedEntity != nil {
+                       return nil, parsedEntity, nil
+               }
+               if ok, indexRule := schema.IndexDefined(cond.Name); ok {
+                       if indexRule.Location == 
databasev1.IndexRule_LOCATION_GLOBAL {
+                               switch cond.Op {
+                               case modelv1.Condition_BINARY_OP_EQ, 
modelv1.Condition_BINARY_OP_IN:
+                                       return nil, nil, GlobalIndexError{
+                                               IndexRule: indexRule,
+                                               Expr:      expr,
+                                       }
+                               default:
+                                       return nil, nil, 
errors.Wrapf(errUnsupportedConditionOp, "gobal index conf:%s", cond)
+                               }
+                       }
+                       return parseCondition(cond, indexRule, expr, entity)
+               } else if mandatoryIndexRule {
+                       return nil, nil, 
errors.Wrapf(errUnsupportedConditionOp, "mandatory index rule conf:%s", cond)
+               }
+               return eNode, [][]*modelv1.TagValue{entity}, nil
+       case *modelv1.Criteria_Le:
+               le := criteria.GetLe()
+               if le.GetLeft() == nil && le.GetRight() == nil {
+                       return nil, nil, 
errors.WithMessagef(errInvalidLogicalExpression, "both sides(left and right) of 
[%v] are empty", criteria)
+               }
+               if le.GetLeft() == nil {
+                       return BuildLocalFilter(le.Right, schema, entityDict, 
entity, mandatoryIndexRule)
+               }
+               if le.GetRight() == nil {
+                       return BuildLocalFilter(le.Left, schema, entityDict, 
entity, mandatoryIndexRule)
+               }
+               left, leftEntities, err := BuildLocalFilter(le.Left, schema, 
entityDict, entity, mandatoryIndexRule)
+               if err != nil {
+                       return nil, nil, err
+               }
+               right, rightEntities, err := BuildLocalFilter(le.Right, schema, 
entityDict, entity, mandatoryIndexRule)
+               if err != nil {
+                       return nil, nil, err
+               }
+               entities := parseEntities(le.Op, entity, leftEntities, 
rightEntities)
+               if entities == nil {
+                       return nil, nil, nil
+               }
+               if left == nil && right == nil {
+                       return nil, entities, nil
+               }
+               switch le.Op {
+               case modelv1.LogicalExpression_LOGICAL_OP_AND:
+                       and := newAnd(2)
+                       and.append(left).append(right)
+                       return and, entities, nil
+               case modelv1.LogicalExpression_LOGICAL_OP_OR:
+                       or := newOr(2)
+                       or.append(left).append(right)
+                       return or, entities, nil
+               }
+       }
+       return nil, nil, errInvalidCriteriaType
+}
+
+func parseCondition(cond *modelv1.Condition, indexRule *databasev1.IndexRule, 
expr LiteralExpr, entity []*modelv1.TagValue) (index.Filter, 
[][]*modelv1.TagValue, error) {
+       switch cond.Op {
+       case modelv1.Condition_BINARY_OP_GT:
+               return newRange(indexRule, index.RangeOpts{
+                       Lower: bytes.Join(expr.Bytes(), nil),
+               }), [][]*modelv1.TagValue{entity}, nil
+       case modelv1.Condition_BINARY_OP_GE:
+               return newRange(indexRule, index.RangeOpts{
+                       IncludesLower: true,
+                       Lower:         bytes.Join(expr.Bytes(), nil),
+               }), [][]*modelv1.TagValue{entity}, nil
+       case modelv1.Condition_BINARY_OP_LT:
+               return newRange(indexRule, index.RangeOpts{
+                       Upper: bytes.Join(expr.Bytes(), nil),
+               }), [][]*modelv1.TagValue{entity}, nil
+       case modelv1.Condition_BINARY_OP_LE:
+               return newRange(indexRule, index.RangeOpts{
+                       IncludesUpper: true,
+                       Upper:         bytes.Join(expr.Bytes(), nil),
+               }), [][]*modelv1.TagValue{entity}, nil
+       case modelv1.Condition_BINARY_OP_EQ:
+               return newEq(indexRule, expr), [][]*modelv1.TagValue{entity}, 
nil
+       case modelv1.Condition_BINARY_OP_MATCH:
+               return newMatch(indexRule, expr), 
[][]*modelv1.TagValue{entity}, nil
+       case modelv1.Condition_BINARY_OP_NE:
+               return newNot(indexRule, newEq(indexRule, expr)), 
[][]*modelv1.TagValue{entity}, nil
+       case modelv1.Condition_BINARY_OP_HAVING:
+               bb := expr.Bytes()
+               and := newAnd(len(bb))
+               for _, b := range bb {
+                       and.append(newEq(indexRule, newBytesLiteral(b)))
+               }
+               return and, [][]*modelv1.TagValue{entity}, nil
+       case modelv1.Condition_BINARY_OP_NOT_HAVING:
+               bb := expr.Bytes()
+               and := newAnd(len(bb))
+               for _, b := range bb {
+                       and.append(newEq(indexRule, newBytesLiteral(b)))
+               }
+               return newNot(indexRule, and), [][]*modelv1.TagValue{entity}, 
nil
+       case modelv1.Condition_BINARY_OP_IN:
+               bb := expr.Bytes()
+               or := newOr(len(bb))
+               for _, b := range bb {
+                       or.append(newEq(indexRule, newBytesLiteral(b)))
+               }
+               return or, [][]*modelv1.TagValue{entity}, nil
+       case modelv1.Condition_BINARY_OP_NOT_IN:
+               bb := expr.Bytes()
+               or := newOr(len(bb))
+               for _, b := range bb {
+                       or.append(newEq(indexRule, newBytesLiteral(b)))
+               }
+               return newNot(indexRule, or), [][]*modelv1.TagValue{entity}, nil
+       }
+       return nil, nil, errors.WithMessagef(errUnsupportedConditionOp, "index 
filter parses %v", cond)
+}
+
+func parseExprOrEntity(entityDict map[string]int, entity []*modelv1.TagValue, 
cond *modelv1.Condition) (LiteralExpr, [][]*modelv1.TagValue, error) {
+       entityIdx, ok := entityDict[cond.Name]
+       if ok && cond.Op != modelv1.Condition_BINARY_OP_EQ && cond.Op != 
modelv1.Condition_BINARY_OP_IN {
+               return nil, nil, errors.WithMessagef(errUnsupportedConditionOp, 
"tag belongs to the entity only supports EQ or IN operation in condition(%v)", 
cond)
+       }
+       switch v := cond.Value.Value.(type) {
+       case *modelv1.TagValue_Str:
+               if ok {
+                       parsedEntity := make([]*modelv1.TagValue, len(entity))
+                       copy(parsedEntity, entity)
+                       parsedEntity[entityIdx] = cond.Value
+                       return nil, [][]*modelv1.TagValue{parsedEntity}, nil
+               }
+               return str(v.Str.GetValue()), nil, nil
+       case *modelv1.TagValue_StrArray:
+               if ok && cond.Op == modelv1.Condition_BINARY_OP_IN {
+                       entities := make([][]*modelv1.TagValue, 
len(v.StrArray.Value))
+                       for i, va := range v.StrArray.Value {
+                               parsedEntity := make([]*modelv1.TagValue, 
len(entity))
+                               copy(parsedEntity, entity)
+                               parsedEntity[entityIdx] = &modelv1.TagValue{
+                                       Value: &modelv1.TagValue_Str{
+                                               Str: &modelv1.Str{
+                                                       Value: va,
+                                               },
+                                       },
+                               }
+                               entities[i] = parsedEntity
+                       }
+                       return nil, entities, nil
+               }
+               return &strArrLiteral{
+                       arr: v.StrArray.GetValue(),
+               }, nil, nil
+       case *modelv1.TagValue_Int:
+               if ok {
+                       parsedEntity := make([]*modelv1.TagValue, len(entity))
+                       copy(parsedEntity, entity)
+                       parsedEntity[entityIdx] = cond.Value
+                       return nil, [][]*modelv1.TagValue{parsedEntity}, nil
+               }
+               return &int64Literal{
+                       int64: v.Int.GetValue(),
+               }, nil, nil
+       case *modelv1.TagValue_IntArray:
+               if ok && cond.Op == modelv1.Condition_BINARY_OP_IN {
+                       entities := make([][]*modelv1.TagValue, 
len(v.IntArray.Value))
+                       for i, va := range v.IntArray.Value {
+                               parsedEntity := make([]*modelv1.TagValue, 
len(entity))
+                               copy(parsedEntity, entity)
+                               parsedEntity[entityIdx] = &modelv1.TagValue{
+                                       Value: &modelv1.TagValue_Int{
+                                               Int: &modelv1.Int{
+                                                       Value: va,
+                                               },
+                                       },
+                               }
+                               entities[i] = parsedEntity
+                       }
+                       return nil, entities, nil
+               }
+               return &int64ArrLiteral{
+                       arr: v.IntArray.GetValue(),
+               }, nil, nil
+       case *modelv1.TagValue_Null:
+               return nullLiteralExpr, nil, nil
+       }
+       return nil, nil, errors.WithMessagef(errUnsupportedConditionValue, 
"index filter parses %v", cond)
+}
+
+func parseEntities(op modelv1.LogicalExpression_LogicalOp, input 
[]*modelv1.TagValue, left, right [][]*modelv1.TagValue) [][]*modelv1.TagValue {
+       count := len(input)
+       result := make([]*modelv1.TagValue, count)
+       anyEntity := func(entities [][]*modelv1.TagValue) bool {
+               for _, entity := range entities {
+                       for _, entry := range entity {
+                               if entry == pbv1.AnyTagValue {
+                                       return false
+                               }
+                       }
+               }
+               return true
+       }
+       leftAny := anyEntity(left)
+       rightAny := anyEntity(right)
+
+       mergedEntities := make([][]*modelv1.TagValue, 0, len(left)+len(right))
+
+       switch op {
+       case modelv1.LogicalExpression_LOGICAL_OP_AND:
+               if leftAny && !rightAny {
+                       return right
+               }
+               if !leftAny && rightAny {
+                       return left
+               }
+               mergedEntities = append(mergedEntities, left...)
+               mergedEntities = append(mergedEntities, right...)
+               for i := 0; i < count; i++ {
+                       entry := pbv1.AnyTagValue
+                       for j := 0; j < len(mergedEntities); j++ {
+                               e := mergedEntities[j][i]
+                               if e == pbv1.AnyTagValue {
+                                       continue
+                               }
+                               if entry == pbv1.AnyTagValue {
+                                       entry = e
+                               } else if pbv1.MustCompareTagValue(entry, e) != 
0 {
+                                       return nil
+                               }
+                       }
+                       result[i] = entry
+               }
+       case modelv1.LogicalExpression_LOGICAL_OP_OR:
+               if leftAny {
+                       return left
+               }
+               if rightAny {
+                       return right
+               }
+               mergedEntities = append(mergedEntities, left...)
+               mergedEntities = append(mergedEntities, right...)
+               for i := 0; i < count; i++ {
+                       entry := pbv1.AnyTagValue
+                       for j := 0; j < len(mergedEntities); j++ {
+                               e := mergedEntities[j][i]
+                               if entry == pbv1.AnyTagValue {
+                                       entry = e
+                               } else if pbv1.MustCompareTagValue(entry, e) != 
0 {
+                                       return mergedEntities
+                               }
+                       }
+                       result[i] = entry
+               }
+       }
+       return [][]*modelv1.TagValue{result}
+}
+
 type fieldKey struct {
        *databasev1.IndexRule
 }
diff --git a/pkg/query/logical/measure/measure_plan_indexscan_local.go 
b/pkg/query/logical/measure/measure_plan_indexscan_local.go
index 7d9a1fa8..d7866d80 100644
--- a/pkg/query/logical/measure/measure_plan_indexscan_local.go
+++ b/pkg/query/logical/measure/measure_plan_indexscan_local.go
@@ -20,19 +20,16 @@ package measure
 import (
        "context"
        "fmt"
-       "io"
        "time"
 
-       "go.uber.org/multierr"
        "google.golang.org/protobuf/types/known/timestamppb"
 
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
-       "github.com/apache/skywalking-banyandb/banyand/tsdb"
        "github.com/apache/skywalking-banyandb/pkg/index"
-       "github.com/apache/skywalking-banyandb/pkg/iter/sort"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/query/executor"
        "github.com/apache/skywalking-banyandb/pkg/query/logical"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -51,8 +48,17 @@ type unresolvedIndexScan struct {
 }
 
 func (uis *unresolvedIndexScan) Analyze(s logical.Schema) (logical.Plan, 
error) {
+       var projTags []pbv1.TagProjection
        var projTagsRefs [][]*logical.TagRef
        if len(uis.projectionTags) > 0 {
+               for i := range uis.projectionTags {
+                       for _, tag := range uis.projectionTags[i] {
+                               projTags = append(projTags, pbv1.TagProjection{
+                                       Family: tag.GetFamilyName(),
+                                       Name:   tag.GetTagName(),
+                               })
+                       }
+               }
                var err error
                projTagsRefs, err = s.CreateTagRef(uis.projectionTags...)
                if err != nil {
@@ -60,8 +66,12 @@ func (uis *unresolvedIndexScan) Analyze(s logical.Schema) 
(logical.Plan, error)
                }
        }
 
+       var projField []string
        var projFieldRefs []*logical.FieldRef
        if len(uis.projectionFields) > 0 {
+               for i := range uis.projectionFields {
+                       projField = append(projField, 
uis.projectionFields[i].Name)
+               }
                var err error
                projFieldRefs, err = s.CreateFieldRef(uis.projectionFields...)
                if err != nil {
@@ -71,11 +81,11 @@ func (uis *unresolvedIndexScan) Analyze(s logical.Schema) 
(logical.Plan, error)
 
        entityList := s.EntityList()
        entityMap := make(map[string]int)
-       entity := make([]tsdb.Entry, len(entityList))
+       entity := make([]*modelv1.TagValue, len(entityList))
        for idx, e := range entityList {
                entityMap[e] = idx
                // fill AnyEntry by default
-               entity[idx] = tsdb.AnyEntry
+               entity[idx] = pbv1.AnyTagValue
        }
        filter, entities, err := logical.BuildLocalFilter(uis.criteria, s, 
entityMap, entity, true)
        if err != nil {
@@ -85,6 +95,8 @@ func (uis *unresolvedIndexScan) Analyze(s logical.Schema) 
(logical.Plan, error)
        return &localIndexScan{
                timeRange:            
timestamp.NewInclusiveTimeRange(uis.startTime, uis.endTime),
                schema:               s,
+               projectionTags:       projTags,
+               projectionFields:     projField,
                projectionTagsRefs:   projTagsRefs,
                projectionFieldsRefs: projFieldRefs,
                metadata:             uis.metadata,
@@ -108,9 +120,11 @@ type localIndexScan struct {
        metadata             *commonv1.Metadata
        l                    *logger.Logger
        timeRange            timestamp.TimeRange
+       projectionTags       []pbv1.TagProjection
+       projectionFields     []string
        projectionTagsRefs   [][]*logical.TagRef
        projectionFieldsRefs []*logical.FieldRef
-       entities             []tsdb.Entity
+       entities             [][]*modelv1.TagValue
        groupByEntity        bool
        maxDataPointsSize    int
 }
@@ -124,72 +138,41 @@ func (i *localIndexScan) Sort(order *logical.OrderBy) {
 }
 
 func (i *localIndexScan) Execute(ctx context.Context) (mit executor.MIterator, 
err error) {
-       var orderBy *tsdb.OrderBy
+       var orderBy *pbv1.OrderBy
        if i.order.Index != nil {
-               orderBy = &tsdb.OrderBy{
+               orderBy = &pbv1.OrderBy{
                        Index: i.order.Index,
                        Sort:  i.order.Sort,
                }
+       } else if i.groupByEntity {
+               orderBy = &pbv1.OrderBy{
+                       Sort: i.order.Sort,
+               }
        }
        ec := executor.FromMeasureExecutionContext(ctx)
-       var seriesList tsdb.SeriesList
+       var results []pbv1.MeasureQueryResult
        for _, e := range i.entities {
-               shards, errInternal := ec.Shards(e)
-               if errInternal != nil {
-                       return nil, errInternal
-               }
-               for _, shard := range shards {
-                       sl, errInternal := shard.Series().Search(
-                               context.WithValue(
-                                       ctx,
-                                       logger.ContextKey,
-                                       i.l,
-                               ),
-                               tsdb.NewPath(e),
-                               i.filter,
-                               orderBy,
-                       )
-                       if errInternal != nil {
-                               return nil, errInternal
-                       }
-                       seriesList = seriesList.Merge(sl)
-               }
-       }
-       if len(seriesList) == 0 {
-               return dummyIter, nil
-       }
-       var builders []logical.SeekerBuilder
-       if i.order.Index == nil {
-               builders = append(builders, func(builder tsdb.SeekerBuilder) {
-                       builder.OrderByTime(i.order.Sort)
+               result, err := ec.Query(ctx, pbv1.MeasureQueryOptions{
+                       Name:            i.metadata.GetName(),
+                       TimeRange:       &i.timeRange,
+                       Entity:          e,
+                       Filter:          i.filter,
+                       Order:           orderBy,
+                       TagProjection:   i.projectionTags,
+                       FieldProjection: i.projectionFields,
                })
-       }
-       // CAVEAT: the order of series list matters when sorting by an index.
-       iters, closers, err := logical.ExecuteForShard(ctx, i.l, seriesList, 
i.timeRange, builders...)
-       if err != nil {
-               return nil, err
-       }
-       if len(closers) > 0 {
-               defer func(closers []io.Closer) {
-                       for _, c := range closers {
-                               err = multierr.Append(err, c.Close())
-                       }
-               }(closers)
-       }
+               if err != nil {
+                       return nil, fmt.Errorf("failed to query measure: %w", 
err)
+               }
 
-       if len(iters) == 0 {
-               return dummyIter, nil
+               results = append(results, result)
        }
-       transformContext := transformContext{
-               ec:                   ec,
-               projectionTagsRefs:   i.projectionTagsRefs,
-               projectionFieldsRefs: i.projectionFieldsRefs,
-       }
-       if i.groupByEntity {
-               return newSeriesMIterator(iters, transformContext, 
i.maxDataPointsSize), nil
+       if len(results) == 0 {
+               return dummyIter, nil
        }
-       it := logical.NewItemIter(iters, i.order.Sort)
-       return newIndexScanIterator(it, transformContext, i.maxDataPointsSize), 
nil
+       return &resultMIterator{
+               results: results,
+       }, nil
 }
 
 func (i *localIndexScan) String() string {
@@ -223,129 +206,59 @@ func indexScan(startTime, endTime time.Time, metadata 
*commonv1.Metadata, projec
        }
 }
 
-var _ executor.MIterator = (*indexScanIterator)(nil)
-
-type indexScanIterator struct {
-       inner   sort.Iterator[tsdb.Item]
-       err     error
-       current *measurev1.DataPoint
-       context transformContext
-       max     int
-       num     int
-}
-
-func newIndexScanIterator(inner sort.Iterator[tsdb.Item], context 
transformContext, max int) executor.MIterator {
-       return &indexScanIterator{
-               inner:   inner,
-               context: context,
-               max:     max,
-       }
-}
-
-func (ism *indexScanIterator) Next() bool {
-       if !ism.inner.Next() || ism.err != nil || ism.num > ism.max {
-               return false
-       }
-       nextItem := ism.inner.Val()
-       var err error
-       if ism.current, err = transform(nextItem, ism.context); err != nil {
-               ism.err = multierr.Append(ism.err, err)
-       }
-       ism.num++
-       return true
-}
-
-func (ism *indexScanIterator) Current() []*measurev1.DataPoint {
-       if ism.current == nil {
-               return nil
-       }
-       return []*measurev1.DataPoint{ism.current}
-}
-
-func (ism *indexScanIterator) Close() error {
-       return multierr.Combine(ism.err, ism.inner.Close())
-}
-
-var _ executor.MIterator = (*seriesIterator)(nil)
-
-type seriesIterator struct {
-       err     error
-       context transformContext
-       inner   []tsdb.Iterator
+type resultMIterator struct {
+       results []pbv1.MeasureQueryResult
        current []*measurev1.DataPoint
        index   int
-       num     int
-       max     int
 }
 
-func newSeriesMIterator(inner []tsdb.Iterator, context transformContext, max 
int) executor.MIterator {
-       return &seriesIterator{
-               inner:   inner,
-               context: context,
-               index:   -1,
-               max:     max,
-       }
-}
-
-func (ism *seriesIterator) Next() bool {
-       if ism.err != nil || ism.num > ism.max {
-               return false
-       }
-       ism.index++
-       if ism.index >= len(ism.inner) {
+func (ei *resultMIterator) Next() bool {
+       if ei.index >= len(ei.results) {
                return false
        }
-       iter := ism.inner[ism.index]
-       if ism.current != nil {
-               ism.current = ism.current[:0]
+
+       r := ei.results[ei.index].Pull()
+       if r == nil {
+               ei.index++
+               return ei.Next()
        }
-       for iter.Next() {
-               dp, err := transform(iter.Val(), ism.context)
-               if err != nil {
-                       ism.err = err
-                       return false
+       for i := range r.Timestamps {
+               dp := &measurev1.DataPoint{
+                       Timestamp: timestamppb.New(time.Unix(0, 
int64(r.Timestamps[i]))),
                }
-               ism.current = append(ism.current, dp)
-       }
-       ism.num++
-       return true
-}
 
-func (ism *seriesIterator) Current() []*measurev1.DataPoint {
-       return ism.current
-}
-
-func (ism *seriesIterator) Close() error {
-       for _, i := range ism.inner {
-               ism.err = multierr.Append(ism.err, i.Close())
+               for _, tf := range r.TagFamilies {
+                       tagFamily := &modelv1.TagFamily{
+                               Name: tf.Name,
+                       }
+                       dp.TagFamilies = append(dp.TagFamilies, tagFamily)
+                       for _, t := range tf.Tags {
+                               tagFamily.Tags = append(tagFamily.Tags, 
&modelv1.Tag{
+                                       Key:   t.Name,
+                                       Value: t.Values[i],
+                               })
+                       }
+               }
+               for _, f := range r.Fields {
+                       dp.Fields = append(dp.Fields, 
&measurev1.DataPoint_Field{
+                               Name:  f.Name,
+                               Value: f.Values[i],
+                       })
+               }
        }
-       return ism.err
+
+       return true
 }
 
-type transformContext struct {
-       ec                   executor.MeasureExecutionContext
-       projectionTagsRefs   [][]*logical.TagRef
-       projectionFieldsRefs []*logical.FieldRef
+func (ei *resultMIterator) Current() []*measurev1.DataPoint {
+       return ei.current
 }
 
-func transform(item tsdb.Item, ism transformContext) (*measurev1.DataPoint, 
error) {
-       tagFamilies, err := logical.ProjectItem(ism.ec, item, 
ism.projectionTagsRefs)
-       if err != nil {
-               return nil, err
+func (ei *resultMIterator) Close() error {
+       for _, result := range ei.results {
+               result.Release()
        }
-       dpFields := make([]*measurev1.DataPoint_Field, 0)
-       for _, f := range ism.projectionFieldsRefs {
-               fieldVal, parserFieldErr := ism.ec.ParseField(f.Field.Name, 
item)
-               if parserFieldErr != nil {
-                       return nil, parserFieldErr
-               }
-               dpFields = append(dpFields, fieldVal)
-       }
-       return &measurev1.DataPoint{
-               Fields:      dpFields,
-               TagFamilies: tagFamilies,
-               Timestamp:   timestamppb.New(time.Unix(0, int64(item.Time()))),
-       }, nil
+       return nil
 }
 
 var dummyIter = dummyMIterator{}
diff --git a/pkg/query/logical/measure/topn_plan_localscan.go 
b/pkg/query/logical/measure/topn_plan_localscan.go
index 702dc57b..e04ba384 100644
--- a/pkg/query/logical/measure/topn_plan_localscan.go
+++ b/pkg/query/logical/measure/topn_plan_localscan.go
@@ -21,20 +21,14 @@ package measure
 import (
        "context"
        "fmt"
-       "io"
        "time"
 
        "github.com/pkg/errors"
-       "go.uber.org/multierr"
 
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
-       measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
-       "github.com/apache/skywalking-banyandb/banyand/measure"
-       "github.com/apache/skywalking-banyandb/banyand/tsdb"
-       "github.com/apache/skywalking-banyandb/pkg/convert"
-       "github.com/apache/skywalking-banyandb/pkg/iter/sort"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/query/executor"
        "github.com/apache/skywalking-banyandb/pkg/query/logical"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -54,7 +48,16 @@ type unresolvedLocalScan struct {
 
 func (uls *unresolvedLocalScan) Analyze(s logical.Schema) (logical.Plan, 
error) {
        var projTagsRefs [][]*logical.TagRef
+       var projTags []pbv1.TagProjection
        if len(uls.projectionTags) > 0 {
+               for i := range uls.projectionTags {
+                       for _, tag := range uls.projectionTags[i] {
+                               projTags = append(projTags, pbv1.TagProjection{
+                                       Family: tag.GetFamilyName(),
+                                       Name:   tag.GetTagName(),
+                               })
+                       }
+               }
                var err error
                projTagsRefs, err = s.CreateTagRef(uls.projectionTags...)
                if err != nil {
@@ -63,7 +66,11 @@ func (uls *unresolvedLocalScan) Analyze(s logical.Schema) 
(logical.Plan, error)
        }
 
        var projFieldRefs []*logical.FieldRef
+       var projField []string
        if len(uls.projectionFields) > 0 {
+               for i := range uls.projectionFields {
+                       projField = append(projField, 
uls.projectionFields[i].Name)
+               }
                var err error
                projFieldRefs, err = s.CreateFieldRef(uls.projectionFields...)
                if err != nil {
@@ -81,34 +88,41 @@ func (uls *unresolvedLocalScan) Analyze(s logical.Schema) 
(logical.Plan, error)
                schema:               s,
                projectionTagsRefs:   projTagsRefs,
                projectionFieldsRefs: projFieldRefs,
+               projectionTags:       projTags,
+               projectionFields:     projField,
                metadata:             uls.metadata,
                entity:               entity,
                l:                    logger.GetLogger("topn", "measure", 
uls.metadata.Group, uls.metadata.Name, "local-index"),
        }, nil
 }
 
-func (uls *unresolvedLocalScan) locateEntity(entityList []string) 
(tsdb.Entity, error) {
+func (uls *unresolvedLocalScan) locateEntity(entityList []string) 
([]*modelv1.TagValue, error) {
        entityMap := make(map[string]int)
-       entity := make([]tsdb.Entry, 1+1+len(entityList))
+       entity := make([]*modelv1.TagValue, 1+1+len(entityList))
        // sortDirection
-       entity[0] = convert.Int64ToBytes(int64(uls.sort.Number()))
+       entity[0] = &modelv1.TagValue{
+               Value: &modelv1.TagValue_Int{
+                       Int: &modelv1.Int{
+                               Value: int64(uls.sort),
+                       },
+               },
+       }
        // rankNumber
-       entity[1] = tsdb.AnyEntry
+       entity[1] = pbv1.AnyTagValue
        for idx, tagName := range entityList {
                entityMap[tagName] = idx + 2
                // allow to make fuzzy search with partial conditions
-               entity[idx+2] = tsdb.AnyEntry
+               entity[idx+2] = pbv1.AnyTagValue
        }
        for _, pairQuery := range uls.conditions {
                if pairQuery.GetOp() != modelv1.Condition_BINARY_OP_EQ {
                        return nil, errors.Errorf("tag belongs to the entity 
only supports EQ operation in condition(%v)", pairQuery)
                }
                if entityIdx, ok := entityMap[pairQuery.GetName()]; ok {
-                       switch v := pairQuery.GetValue().GetValue().(type) {
-                       case *modelv1.TagValue_Str:
-                               entity[entityIdx] = []byte(v.Str.GetValue())
-                       case *modelv1.TagValue_Int:
-                               entity[entityIdx] = 
convert.Int64ToBytes(v.Int.GetValue())
+                       entity[entityIdx] = pairQuery.Value
+                       switch pairQuery.GetValue().GetValue().(type) {
+                       case *modelv1.TagValue_Str, *modelv1.TagValue_Int:
+                               entity[entityIdx] = pairQuery.Value
                        default:
                                return nil, errors.New("unsupported condition 
tag type for entity")
                        }
@@ -143,58 +157,30 @@ type localScan struct {
        timeRange            timestamp.TimeRange
        projectionTagsRefs   [][]*logical.TagRef
        projectionFieldsRefs []*logical.FieldRef
-       entity               tsdb.Entity
+       projectionTags       []pbv1.TagProjection
+       projectionFields     []string
+       entity               []*modelv1.TagValue
        sort                 modelv1.Sort
 }
 
 func (i *localScan) Execute(ctx context.Context) (mit executor.MIterator, err 
error) {
-       var seriesList tsdb.SeriesList
+
        ec := executor.FromMeasureExecutionContext(ctx)
-       shards, err := ec.(measure.Measure).CompanionShards(i.metadata)
-       if err != nil {
-               return nil, err
-       }
-       for _, shard := range shards {
-               sl, errInternal := shard.Series().List(context.WithValue(
-                       ctx,
-                       logger.ContextKey,
-                       i.l,
-               ), tsdb.NewPath(i.entity))
-               if errInternal != nil {
-                       return nil, errInternal
-               }
-               seriesList = seriesList.Merge(sl)
-       }
-       if len(seriesList) == 0 {
-               return dummyIter, nil
-       }
-       var builders []logical.SeekerBuilder
-       builders = append(builders, func(builder tsdb.SeekerBuilder) {
-               builder.OrderByTime(i.sort)
+       result, err := ec.Query(ctx, pbv1.MeasureQueryOptions{
+               Name:            i.metadata.GetName(),
+               TimeRange:       &i.timeRange,
+               Entity:          i.entity,
+               Order:           &pbv1.OrderBy{Sort: i.sort},
+               TagProjection:   i.projectionTags,
+               FieldProjection: i.projectionFields,
        })
-       iters, closers, err := logical.ExecuteForShard(ctx, i.l, seriesList, 
i.timeRange, builders...)
        if err != nil {
-               return nil, err
-       }
-       if len(closers) > 0 {
-               defer func(closers []io.Closer) {
-                       for _, c := range closers {
-                               err = multierr.Append(err, c.Close())
-                       }
-               }(closers)
-       }
-
-       if len(iters) == 0 {
-               return dummyIter, nil
+               return nil, fmt.Errorf("failed to query measure: %w", err)
        }
-       tc := transformContext{
-               ec:                   ec,
-               projectionTagsRefs:   i.projectionTagsRefs,
-               projectionFieldsRefs: i.projectionFieldsRefs,
-       }
-       it := logical.NewItemIter(iters, i.sort)
+       return &resultMIterator{
+               results: []pbv1.MeasureQueryResult{result},
+       }, nil
 
-       return newLocalScanIterator(it, tc), nil
 }
 
 func (i *localScan) String() string {
@@ -213,42 +199,3 @@ func (i *localScan) Schema() logical.Schema {
        }
        return 
i.schema.ProjTags(i.projectionTagsRefs...).ProjFields(i.projectionFieldsRefs...)
 }
-
-type localScanIterator struct {
-       inner   sort.Iterator[tsdb.Item]
-       err     error
-       current *measurev1.DataPoint
-       context transformContext
-       num     int
-}
-
-func (lst *localScanIterator) Next() bool {
-       if !lst.inner.Next() || lst.err != nil {
-               return false
-       }
-       nextItem := lst.inner.Val()
-       var err error
-       if lst.current, err = transform(nextItem, lst.context); err != nil {
-               lst.err = multierr.Append(lst.err, err)
-       }
-       lst.num++
-       return true
-}
-
-func (lst *localScanIterator) Current() []*measurev1.DataPoint {
-       if lst.current == nil {
-               return nil
-       }
-       return []*measurev1.DataPoint{lst.current}
-}
-
-func (lst *localScanIterator) Close() error {
-       return multierr.Combine(lst.err, lst.inner.Close())
-}
-
-func newLocalScanIterator(inner sort.Iterator[tsdb.Item], context 
transformContext) executor.MIterator {
-       return &localScanIterator{
-               inner:   inner,
-               context: context,
-       }
-}
diff --git a/pkg/query/logical/schema.go b/pkg/query/logical/schema.go
index e0a14c07..cc588e93 100644
--- a/pkg/query/logical/schema.go
+++ b/pkg/query/logical/schema.go
@@ -120,7 +120,7 @@ func (cs *CommonSchema) ProjTags(refs ...[]*TagRef) 
*CommonSchema {
        }
        for projFamilyIdx, refInFamily := range refs {
                for projIdx, ref := range refInFamily {
-                       newCommonSchema.TagSpecMap[ref.Tag.getTagName()] = 
&TagSpec{
+                       newCommonSchema.TagSpecMap[ref.Tag.GetTagName()] = 
&TagSpec{
                                TagFamilyIdx: projFamilyIdx,
                                TagIdx:       projIdx,
                                Spec:         ref.Spec.Spec,
@@ -160,7 +160,7 @@ func (cs *CommonSchema) CreateRef(tags ...[]*Tag) 
([][]*TagRef, error) {
        for i, tagInFamily := range tags {
                var tagRefsInFamily []*TagRef
                for _, tag := range tagInFamily {
-                       if ts, ok := cs.TagSpecMap[tag.getTagName()]; ok {
+                       if ts, ok := cs.TagSpecMap[tag.GetTagName()]; ok {
                                tagRefsInFamily = append(tagRefsInFamily, 
&TagRef{tag, ts})
                        } else {
                                return nil, errors.Wrap(errTagNotDefined, 
tag.GetCompoundName())
diff --git a/pkg/query/logical/stream/stream_plan_tag_filter.go 
b/pkg/query/logical/stream/stream_plan_tag_filter.go
index f6e2f3f0..89c851df 100644
--- a/pkg/query/logical/stream/stream_plan_tag_filter.go
+++ b/pkg/query/logical/stream/stream_plan_tag_filter.go
@@ -59,7 +59,7 @@ func (uis *unresolvedTagFilter) Analyze(s logical.Schema) 
(logical.Plan, error)
                entity[idx] = tsdb.AnyEntry
        }
        var err error
-       ctx.filter, ctx.entities, err = logical.BuildLocalFilter(uis.criteria, 
s, entityDict, entity, false)
+       ctx.filter, ctx.entities, err = 
logical.BuildLocalFilterDeprecated(uis.criteria, s, entityDict, entity, false)
        if err != nil {
                var ge logical.GlobalIndexError
                if errors.As(err, &ge) {

Reply via email to