This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch kv
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit c99420652e256111d0e5cdcf9385fcc4130c7a42
Author: Gao Hongtao <[email protected]>
AuthorDate: Mon Nov 21 05:20:09 2022 +0000

    Fix some flaws in kv
    
    * Correct int encoding disorder
    * Add a print context helper for debugging encoding issues
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 banyand/kv/badger.go                               |  23 +++++
 banyand/kv/kv.go                                   |   2 +
 banyand/measure/measure_query.go                   |  12 ++-
 banyand/tsdb/block.go                              |   5 +
 banyand/tsdb/series.go                             |   7 +-
 banyand/tsdb/series_seek.go                        |  69 +++++++++++++
 banyand/tsdb/series_seek_sort.go                   |  15 ++-
 pkg/encoding/encoding.go                           |   2 +
 pkg/encoding/int.go                                |  43 +++++---
 pkg/encoding/int_test.go                           | 110 ++++++++++++++-------
 pkg/encoding/plain.go                              |   9 ++
 pkg/index/iterator.go                              |   4 +
 pkg/pb/v1/write.go                                 |   3 +-
 .../measure/measure_plan_indexscan_local.go        |   2 +-
 14 files changed, 244 insertions(+), 62 deletions(-)

diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
index 4482721..9ef467b 100644
--- a/banyand/kv/badger.go
+++ b/banyand/kv/badger.go
@@ -48,6 +48,25 @@ type badgerTSS struct {
        badger.TSet
 }
 
+func (b *badgerTSS) Context(key []byte, ts uint64, n int) (pre Iterator, next 
Iterator) {
+       preOpts := badger.DefaultIteratorOptions
+       preOpts.PrefetchSize = n
+       preOpts.PrefetchValues = false
+       preOpts.Prefix = key
+       preOpts.Reverse = false
+       nextOpts := badger.DefaultIteratorOptions
+       nextOpts.PrefetchSize = n
+       nextOpts.PrefetchValues = false
+       nextOpts.Prefix = key
+       nextOpts.Reverse = true
+       seekKey := y.KeyWithTs(key, ts)
+       preIter := b.db.NewIterator(preOpts)
+       preIter.Seek(seekKey)
+       nextIter := b.db.NewIterator(nextOpts)
+       nextIter.Seek(seekKey)
+       return &iterator{delegated: preIter}, &iterator{delegated: nextIter, 
reverse: true}
+}
+
 func (b *badgerTSS) Stats() (s observability.Statistics) {
        return badgerStats(b.db)
 }
@@ -190,6 +209,10 @@ func (i *iterator) Key() []byte {
        return y.ParseKey(i.delegated.Key())
 }
 
+func (i *iterator) RawKey() []byte {
+       return i.delegated.Key()
+}
+
 func (i *iterator) Val() []byte {
        return y.Copy(i.delegated.Value().Value)
 }
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index 90944a2..5dd9e53 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -80,6 +80,7 @@ type TimeSeriesWriter interface {
 type TimeSeriesReader interface {
        // Get a value by its key and timestamp/version
        Get(key []byte, ts uint64) ([]byte, error)
+       Context(key []byte, ts uint64, n int) (pre, next Iterator)
 }
 
 // TimeSeriesStore is time series storage
@@ -140,6 +141,7 @@ type Iterator interface {
        Rewind()
        Seek(key []byte)
        Key() []byte
+       RawKey() []byte
        Val() []byte
        Valid() bool
        Close() error
diff --git a/banyand/measure/measure_query.go b/banyand/measure/measure_query.go
index 89f1b13..4581f3f 100644
--- a/banyand/measure/measure_query.go
+++ b/banyand/measure/measure_query.go
@@ -114,10 +114,14 @@ func (s *measure) Shard(id common.ShardID) (tsdb.Shard, 
error) {
 }
 
 func (s *measure) ParseTagFamily(family string, item tsdb.Item) 
(*modelv1.TagFamily, error) {
-       familyRawBytes, err := item.Family(familyIdentity(family, pbv1.TagFlag))
+       fid := familyIdentity(family, pbv1.TagFlag)
+       familyRawBytes, err := item.Family(fid)
        if err != nil {
                return nil, errors.Wrapf(err, "measure %s.%s parse family %s", 
s.name, s.group, family)
        }
+       if len(familyRawBytes) < 1 {
+               item.PrintContext(s.l.Named("tag-family"), fid, 10)
+       }
        tagFamily := &modelv1.TagFamilyForWrite{}
        err = proto.Unmarshal(familyRawBytes, tagFamily)
        if err != nil {
@@ -155,10 +159,14 @@ func (s *measure) ParseField(name string, item tsdb.Item) 
(*measurev1.DataPoint_
                        break
                }
        }
-       bytes, err := item.Family(familyIdentity(name, 
pbv1.EncoderFieldFlag(fieldSpec, s.interval)))
+       fid := familyIdentity(name, pbv1.EncoderFieldFlag(fieldSpec, 
s.interval))
+       bytes, err := item.Family(fid)
        if err != nil {
                return nil, err
        }
+       if len(bytes) < 1 {
+               item.PrintContext(s.l.Named("field"), fid, 10)
+       }
        fieldValue, err := pbv1.DecodeFieldValue(bytes, fieldSpec)
        if err != nil {
                return nil, err
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 5ba3331..bdfde12 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -322,6 +322,7 @@ type BlockDelegate interface {
        writeLSMIndex(fields []index.Field, id common.ItemID) error
        writeInvertedIndex(fields []index.Field, id common.ItemID) error
        dataReader() kv.TimeSeriesReader
+       decoderPool() encoding.SeriesDecoderPool
        lsmIndexReader() index.Searcher
        invertedIndexReader() index.Searcher
        primaryIndexReader() index.FieldIterable
@@ -336,6 +337,10 @@ type bDelegate struct {
        delegate *block
 }
 
+func (d *bDelegate) decoderPool() encoding.SeriesDecoderPool {
+       return d.delegate.encodingMethod.DecoderPool
+}
+
 func (d *bDelegate) dataReader() kv.TimeSeriesReader {
        return d.delegate.store
 }
diff --git a/banyand/tsdb/series.go b/banyand/tsdb/series.go
index 09d62aa..4d44f30 100644
--- a/banyand/tsdb/series.go
+++ b/banyand/tsdb/series.go
@@ -104,9 +104,10 @@ func (s *series) Get(ctx context.Context, id GlobalItemID) 
(Item, io.Closer, err
                return nil, nil, errors.WithMessagef(ErrBlockAbsent, "id: %v", 
id)
        }
        return &item{
-               data:     b.dataReader(),
-               itemID:   id.ID,
-               seriesID: s.id,
+               data:        b.dataReader(),
+               itemID:      id.ID,
+               seriesID:    s.id,
+               decoderPool: b.decoderPool(),
        }, b, nil
 }
 
diff --git a/banyand/tsdb/series_seek.go b/banyand/tsdb/series_seek.go
index b3f08e1..18db6e2 100644
--- a/banyand/tsdb/series_seek.go
+++ b/banyand/tsdb/series_seek.go
@@ -18,11 +18,19 @@
 package tsdb
 
 import (
+       "bytes"
+       "encoding/hex"
+       "time"
+
+       "github.com/dgraph-io/badger/v3/y"
+
        "github.com/apache/skywalking-banyandb/api/common"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/banyand/kv"
+       "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/index"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
 type Iterator interface {
@@ -33,6 +41,7 @@ type Iterator interface {
 
 type Item interface {
        Family(family []byte) ([]byte, error)
+       PrintContext(l *logger.Logger, family []byte, n int)
        Val() ([]byte, error)
        ID() common.ItemID
        SortedField() []byte
@@ -59,6 +68,7 @@ type seekerBuilder struct {
        order               modelv1.Sort
        indexRuleForSorting *databasev1.IndexRule
        rangeOptsForSorting index.RangeOpts
+       l                   *logger.Logger
 }
 
 func (s *seekerBuilder) Build() (Seeker, error) {
@@ -75,6 +85,7 @@ func (s *seekerBuilder) Build() (Seeker, error) {
 func newSeekerBuilder(s *seriesSpan) SeekerBuilder {
        return &seekerBuilder{
                seriesSpan: s,
+               l:          logger.GetLogger("seeker-builder"),
        }
 }
 
@@ -101,6 +112,7 @@ type item struct {
        data        kv.TimeSeriesReader
        seriesID    common.SeriesID
        sortedField []byte
+       decoderPool encoding.SeriesDecoderPool
 }
 
 func (i *item) Time() uint64 {
@@ -119,6 +131,63 @@ func (i *item) Family(family []byte) ([]byte, error) {
        return i.data.Get(d.marshal(), uint64(i.itemID))
 }
 
+func (i *item) PrintContext(l *logger.Logger, family []byte, n int) {
+       decoder := i.decoderPool.Get(family)
+       defer i.decoderPool.Put(decoder)
+       d := dataBucket{
+               seriesID: i.seriesID,
+               family:   family,
+       }
+       key := d.marshal()
+       pre, next := i.data.Context(key, uint64(i.itemID), n)
+       defer pre.Close()
+       defer next.Close()
+       j := 0
+       currentTS := uint64(i.itemID)
+
+       each := func(iter kv.Iterator) {
+               if !bytes.Equal(key, iter.Key()) {
+                       return
+               }
+               j++
+
+               ts := y.ParseTs(iter.RawKey())
+
+               logEvent := l.Info().Int("i", j).
+                       Time("ts", time.Unix(0, 
int64(y.ParseTs(iter.RawKey()))))
+               if err := decoder.Decode(family, iter.Val()); err != nil {
+                       logEvent = logEvent.Str("loc", "mem")
+                       if ts == currentTS {
+                               logEvent = logEvent.Bool("at", true)
+                       }
+               } else {
+                       start, end := decoder.Range()
+                       logEvent = logEvent.Time("start", time.Unix(0, 
int64(start))).
+                               Time("end", time.Unix(0, 
int64(end))).Int("num", decoder.Len()).Str("loc", "table")
+                       if start <= currentTS && currentTS <= end {
+                               if dd, err := decoder.Get(currentTS); err == 
nil && len(dd) > 0 {
+                                       logEvent = logEvent.Bool("at", true)
+                               }
+                       }
+               }
+               logEvent.Send()
+       }
+
+       s := hex.EncodeToString(key)
+       if len(s) > 7 {
+               s = s[:7]
+       }
+       l.Info().Str("prefix", s).Time("ts", time.Unix(0, 
int64(i.itemID))).Msg("print previous lines")
+       for ; pre.Valid() && j < n; pre.Next() {
+               each(pre)
+       }
+       j = 0
+       l.Info().Str("prefix", s).Time("ts", time.Unix(0, 
int64(i.itemID))).Msg("print next lines")
+       for ; next.Valid() && j < n; next.Next() {
+               each(next)
+       }
+}
+
 func (i *item) Val() ([]byte, error) {
        d := dataBucket{
                seriesID: i.seriesID,
diff --git a/banyand/tsdb/series_seek_sort.go b/banyand/tsdb/series_seek_sort.go
index 76bc194..910499e 100644
--- a/banyand/tsdb/series_seek_sort.go
+++ b/banyand/tsdb/series_seek_sort.go
@@ -28,6 +28,7 @@ import (
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/banyand/kv"
        "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/index"
        "github.com/apache/skywalking-banyandb/pkg/index/posting"
        "github.com/apache/skywalking-banyandb/pkg/logger"
@@ -88,7 +89,8 @@ func (s *seekerBuilder) buildSeriesByIndex() (series 
[]Iterator, err error) {
                        return nil, err
                }
                if inner != nil {
-                       series = append(series, 
newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), 
s.seriesSpan.seriesID, filters))
+                       series = append(series, 
newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), b.decoderPool(),
+                               s.seriesSpan.seriesID, filters))
                }
        }
        return
@@ -134,9 +136,11 @@ func (s *seekerBuilder) buildSeriesByTime() ([]Iterator, 
error) {
                                return nil, err
                        }
                        if filter == nil {
-                               delegated = append(delegated, 
newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), 
s.seriesSpan.seriesID, emptyFilters))
+                               delegated = append(delegated, 
newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), b.decoderPool(),
+                                       s.seriesSpan.seriesID, emptyFilters))
                        } else {
-                               delegated = append(delegated, 
newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), 
s.seriesSpan.seriesID, []filterFn{filter}))
+                               delegated = append(delegated, 
newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), b.decoderPool(),
+                                       s.seriesSpan.seriesID, 
[]filterFn{filter}))
                        }
                }
        }
@@ -156,6 +160,7 @@ type searcherIterator struct {
        curKey        []byte
        cur           posting.Iterator
        data          kv.TimeSeriesReader
+       decoderPool   encoding.SeriesDecoderPool
        seriesID      common.SeriesID
        filters       []filterFn
        l             *logger.Logger
@@ -193,6 +198,7 @@ func (s *searcherIterator) Val() Item {
                itemID:      s.cur.Current(),
                data:        s.data,
                seriesID:    s.seriesID,
+               decoderPool: s.decoderPool,
        }
 }
 
@@ -201,7 +207,7 @@ func (s *searcherIterator) Close() error {
 }
 
 func newSearcherIterator(l *logger.Logger, fieldIterator index.FieldIterator, 
data kv.TimeSeriesReader,
-       seriesID common.SeriesID, filters []filterFn,
+       decoderPool encoding.SeriesDecoderPool, seriesID common.SeriesID, 
filters []filterFn,
 ) Iterator {
        return &searcherIterator{
                fieldIterator: fieldIterator,
@@ -209,6 +215,7 @@ func newSearcherIterator(l *logger.Logger, fieldIterator 
index.FieldIterator, da
                seriesID:      seriesID,
                filters:       filters,
                l:             l,
+               decoderPool:   decoderPool,
        }
 }
 
diff --git a/pkg/encoding/encoding.go b/pkg/encoding/encoding.go
index 0382ee2..efc1f6e 100644
--- a/pkg/encoding/encoding.go
+++ b/pkg/encoding/encoding.go
@@ -79,6 +79,8 @@ type SeriesDecoder interface {
        Get(ts uint64) ([]byte, error)
        // Iterator returns a SeriesIterator
        Iterator() SeriesIterator
+       // Range returns the start and end time of this series
+       Range() (start, end uint64)
 }
 
 // SeriesIterator iterates time series data
diff --git a/pkg/encoding/int.go b/pkg/encoding/int.go
index c9e1991..c9a48a4 100644
--- a/pkg/encoding/int.go
+++ b/pkg/encoding/int.go
@@ -138,8 +138,10 @@ func (ie *intEncoder) Append(ts uint64, value []byte) {
        if ie.startTime == 0 {
                ie.startTime = ts
                ie.prevTime = ts
+       } else if ie.startTime > ts {
+               ie.startTime = ts
        }
-       gap := int(ts) - int(ie.prevTime)
+       gap := int(ie.prevTime) - int(ts)
        if gap < 0 {
                return
        }
@@ -166,13 +168,15 @@ func (ie *intEncoder) Reset(key []byte) {
        ie.interval = ie.fn(key)
        ie.startTime = 0
        ie.prevTime = 0
+       ie.num = 0
+       ie.values = NewXOREncoder(ie.bw)
 }
 
 func (ie *intEncoder) Encode() ([]byte, error) {
        ie.bw.Flush()
        buffWriter := buffer.NewBufferWriter(ie.buff)
        buffWriter.PutUint64(ie.startTime)
-       buffWriter.PutUint16(uint16(ie.size))
+       buffWriter.PutUint16(uint16(ie.num))
        bb := buffWriter.Bytes()
        encodedSize.WithLabelValues(ie.name, "int").Add(float64(len(bb)))
        return bb, nil
@@ -195,6 +199,9 @@ type intDecoder struct {
 }
 
 func (i *intDecoder) Decode(key, data []byte) error {
+       if len(data) < 10 {
+               return ErrInvalidValue
+       }
        i.interval = i.fn(key)
        i.startTime = binary.LittleEndian.Uint64(data[len(data)-10 : 
len(data)-2])
        i.num = int(binary.LittleEndian.Uint16(data[len(data)-2:]))
@@ -219,29 +226,33 @@ func (i intDecoder) Get(ts uint64) ([]byte, error) {
        return zeroBytes, nil
 }
 
+func (i intDecoder) Range() (start, end uint64) {
+       return i.startTime, i.startTime + uint64(i.num-1)*uint64(i.interval)
+}
+
 func (i intDecoder) Iterator() SeriesIterator {
        br := bit.NewReader(bytes.NewReader(i.area))
        return &intIterator{
-               startTime: i.startTime,
-               interval:  int(i.interval),
-               br:        br,
-               values:    NewXORDecoder(br),
-               size:      i.size,
+               endTime:  i.startTime + uint64(i.num*int(i.interval)),
+               interval: int(i.interval),
+               br:       br,
+               values:   NewXORDecoder(br),
+               size:     i.num,
        }
 }
 
 var (
        _         SeriesIterator = (*intIterator)(nil)
-       zeroBytes                = convert.Int64ToBytes(0)
-       Zero                     = convert.BytesToUint64(zeroBytes)
+       zeroBytes                = convert.Uint64ToBytes(zero)
+       zero                     = 
convert.BytesToUint64(convert.Int64ToBytes(0))
 )
 
 type intIterator struct {
-       startTime uint64
-       interval  int
-       size      int
-       br        *bit.Reader
-       values    *XORDecoder
+       endTime  uint64
+       interval int
+       size     int
+       br       *bit.Reader
+       values   *XORDecoder
 
        currVal  uint64
        currTime uint64
@@ -266,10 +277,10 @@ func (i *intIterator) Next() bool {
                        i.currVal = i.values.Value()
                }
        } else {
-               i.currVal = Zero
+               i.currVal = zero
        }
-       i.currTime = i.startTime + uint64(i.interval*i.index)
        i.index++
+       i.currTime = i.endTime - uint64(i.interval*i.index)
        return true
 }
 
diff --git a/pkg/encoding/int_test.go b/pkg/encoding/int_test.go
index 657bbe5..1f08278 100644
--- a/pkg/encoding/int_test.go
+++ b/pkg/encoding/int_test.go
@@ -28,8 +28,10 @@ import (
 
 func TestNewIntEncoderAndDecoder(t *testing.T) {
        type tsData struct {
-               ts   []uint64
-               data []int64
+               ts    []uint64
+               data  []int64
+               start uint64
+               end   uint64
        }
        tests := []struct {
                name string
@@ -39,45 +41,53 @@ func TestNewIntEncoderAndDecoder(t *testing.T) {
                {
                        name: "golden path",
                        args: tsData{
-                               ts:   []uint64{uint64(time.Minute), uint64(2 * 
time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
+                               ts:   []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
                                data: []int64{7, 8, 7, 9},
                        },
                        want: tsData{
-                               ts:   []uint64{uint64(time.Minute), uint64(2 * 
time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
-                               data: []int64{7, 8, 7, 9},
+                               ts:    []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+                               data:  []int64{7, 8, 7, 9},
+                               start: uint64(time.Minute),
+                               end:   uint64(4 * time.Minute),
                        },
                },
                {
                        name: "more than the size",
                        args: tsData{
-                               ts:   []uint64{uint64(time.Minute), uint64(2 * 
time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute), uint64(4 * 
time.Minute)},
+                               ts:   []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), 
uint64(1 * time.Minute)},
                                data: []int64{7, 8, 7, 9, 6},
                        },
                        want: tsData{
-                               ts:   []uint64{uint64(time.Minute), uint64(2 * 
time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
-                               data: []int64{7, 8, 7, 9},
+                               ts:    []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+                               data:  []int64{7, 8, 7, 9},
+                               start: uint64(time.Minute),
+                               end:   uint64(4 * time.Minute),
                        },
                },
                {
                        name: "less than the size",
                        args: tsData{
-                               ts:   []uint64{uint64(time.Minute), uint64(2 * 
time.Minute), uint64(3 * time.Minute)},
+                               ts:   []uint64{uint64(3 * time.Minute), 
uint64(2 * time.Minute), uint64(time.Minute)},
                                data: []int64{7, 8, 7},
                        },
                        want: tsData{
-                               ts:   []uint64{uint64(time.Minute), uint64(2 * 
time.Minute), uint64(3 * time.Minute)},
-                               data: []int64{7, 8, 7},
+                               ts:    []uint64{uint64(3 * time.Minute), 
uint64(2 * time.Minute), uint64(time.Minute)},
+                               data:  []int64{7, 8, 7},
+                               start: uint64(time.Minute),
+                               end:   uint64(3 * time.Minute),
                        },
                },
                {
                        name: "empty slot in the middle",
                        args: tsData{
-                               ts:   []uint64{uint64(time.Minute), uint64(4 * 
time.Minute)},
+                               ts:   []uint64{uint64(4 * time.Minute), 
uint64(time.Minute)},
                                data: []int64{7, 9},
                        },
                        want: tsData{
-                               ts:   []uint64{uint64(time.Minute), uint64(2 * 
time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
-                               data: []int64{7, 0, 0, 9},
+                               ts:    []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+                               data:  []int64{7, 0, 0, 9},
+                               start: uint64(time.Minute),
+                               end:   uint64(4 * time.Minute),
                        },
                },
        }
@@ -93,36 +103,49 @@ func TestNewIntEncoderAndDecoder(t *testing.T) {
                t.Run(tt.name, func(t *testing.T) {
                        at := assert.New(t)
                        encoder := encoderPool.Get(key)
+                       defer encoderPool.Put(encoder)
                        decoder := decoderPool.Get(key)
+                       defer decoderPool.Put(decoder)
                        encoder.Reset(key)
+                       isFull := false
                        for i, v := range tt.args.ts {
                                encoder.Append(v, 
convert.Int64ToBytes(tt.args.data[i]))
                                if encoder.IsFull() {
+                                       isFull = true
                                        break
                                }
                        }
                        bb, err := encoder.Encode()
                        at.NoError(err)
+
+                       at.Equal(tt.want.start, encoder.StartTime())
                        at.NoError(decoder.Decode(key, bb))
-                       at.True(decoder.IsFull())
-                       iter := decoder.Iterator()
-                       for i, t := range tt.want.ts {
-                               at.True(iter.Next())
+                       start, end := decoder.Range()
+                       at.Equal(tt.want.start, start)
+                       at.Equal(tt.want.end, end)
+                       if isFull {
+                               at.True(decoder.IsFull())
+                       }
+                       i := 0
+                       for iter := decoder.Iterator(); iter.Next(); i++ {
                                at.NoError(iter.Error())
                                at.Equal(tt.want.ts[i], iter.Time())
                                at.Equal(tt.want.data[i], 
convert.BytesToInt64(iter.Val()))
-                               v, err := decoder.Get(t)
+                               v, err := decoder.Get(iter.Time())
                                at.NoError(err)
                                at.Equal(tt.want.data[i], 
convert.BytesToInt64(v))
                        }
+                       at.Equal(len(tt.want.ts), i)
                })
        }
 }
 
 func TestNewIntDecoderGet(t *testing.T) {
        type tsData struct {
-               ts   []uint64
-               data []int64
+               ts    []uint64
+               data  []int64
+               start uint64
+               end   uint64
        }
        tests := []struct {
                name string
@@ -132,45 +155,53 @@ func TestNewIntDecoderGet(t *testing.T) {
                {
                        name: "golden path",
                        args: tsData{
-                               ts:   []uint64{uint64(time.Minute), uint64(2 * 
time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
+                               ts:   []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
                                data: []int64{7, 8, 7, 9},
                        },
                        want: tsData{
-                               ts:   []uint64{uint64(time.Minute), uint64(2 * 
time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
-                               data: []int64{7, 8, 7, 9},
+                               ts:    []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+                               data:  []int64{7, 8, 7, 9},
+                               start: uint64(time.Minute),
+                               end:   uint64(4 * time.Minute),
                        },
                },
                {
                        name: "more than the size",
                        args: tsData{
-                               ts:   []uint64{uint64(time.Minute), uint64(2 * 
time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute), uint64(4 * 
time.Minute)},
+                               ts:   []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), 
uint64(1 * time.Minute)},
                                data: []int64{7, 8, 7, 9, 6},
                        },
                        want: tsData{
-                               ts:   []uint64{uint64(time.Minute), uint64(2 * 
time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute), uint64(5 * 
time.Minute)},
-                               data: []int64{7, 8, 7, 9, 0},
+                               ts:    []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), 0},
+                               data:  []int64{7, 8, 7, 9, 0},
+                               start: uint64(time.Minute),
+                               end:   uint64(4 * time.Minute),
                        },
                },
                {
                        name: "less than the size",
                        args: tsData{
-                               ts:   []uint64{uint64(time.Minute), uint64(2 * 
time.Minute), uint64(3 * time.Minute)},
+                               ts:   []uint64{uint64(3 * time.Minute), 
uint64(2 * time.Minute), uint64(time.Minute)},
                                data: []int64{7, 8, 7},
                        },
                        want: tsData{
-                               ts:   []uint64{uint64(time.Minute), uint64(2 * 
time.Minute), uint64(3 * time.Minute)},
-                               data: []int64{7, 8, 7},
+                               ts:    []uint64{uint64(3 * time.Minute), 
uint64(2 * time.Minute), uint64(time.Minute)},
+                               data:  []int64{7, 8, 7},
+                               start: uint64(time.Minute),
+                               end:   uint64(3 * time.Minute),
                        },
                },
                {
                        name: "empty slot in the middle",
                        args: tsData{
-                               ts:   []uint64{uint64(time.Minute), uint64(4 * 
time.Minute)},
+                               ts:   []uint64{uint64(4 * time.Minute), 
uint64(time.Minute)},
                                data: []int64{7, 9},
                        },
                        want: tsData{
-                               ts:   []uint64{uint64(time.Minute), uint64(2 * 
time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
-                               data: []int64{7, 0, 0, 9},
+                               ts:    []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+                               data:  []int64{7, 0, 0, 9},
+                               start: uint64(time.Minute),
+                               end:   uint64(4 * time.Minute),
                        },
                },
        }
@@ -186,18 +217,29 @@ func TestNewIntDecoderGet(t *testing.T) {
                t.Run(tt.name, func(t *testing.T) {
                        at := assert.New(t)
                        encoder := encoderPool.Get(key)
+                       defer encoderPool.Put(encoder)
                        decoder := decoderPool.Get(key)
+                       defer decoderPool.Put(decoder)
                        encoder.Reset(key)
+                       isFull := false
                        for i, v := range tt.args.ts {
                                encoder.Append(v, 
convert.Int64ToBytes(tt.args.data[i]))
                                if encoder.IsFull() {
+                                       isFull = true
                                        break
                                }
                        }
                        bb, err := encoder.Encode()
                        at.NoError(err)
+
+                       at.Equal(tt.want.start, encoder.StartTime())
                        at.NoError(decoder.Decode(key, bb))
-                       at.True(decoder.IsFull())
+                       start, end := decoder.Range()
+                       at.Equal(tt.want.start, start)
+                       at.Equal(tt.want.end, end)
+                       if isFull {
+                               at.True(decoder.IsFull())
+                       }
                        for i, t := range tt.want.ts {
                                v, err := decoder.Get(t)
                                at.NoError(err)
diff --git a/pkg/encoding/plain.go b/pkg/encoding/plain.go
index 2284273..388ee8c 100644
--- a/pkg/encoding/plain.go
+++ b/pkg/encoding/plain.go
@@ -202,6 +202,9 @@ func (t *plainDecoder) Len() int {
 }
 
 func (t *plainDecoder) Decode(_, rawData []byte) (err error) {
+       if len(rawData) < 2 {
+               return ErrInvalidValue
+       }
        var data []byte
        size := binary.LittleEndian.Uint16(rawData[len(rawData)-2:])
        if data, err = zstdDecoder.DecodeAll(rawData[:len(rawData)-2], 
make([]byte, 0, size)); err != nil {
@@ -242,6 +245,12 @@ func (t *plainDecoder) Get(ts uint64) ([]byte, error) {
        return getVal(t.val, parseOffset(slot))
 }
 
+func (t *plainDecoder) Range() (start, end uint64) {
+       startSlot := getTSSlot(t.ts, int(t.num)-1)
+       endSlot := getTSSlot(t.ts, 0)
+       return parseTS(startSlot), parseTS(endSlot)
+}
+
 func (t *plainDecoder) Iterator() SeriesIterator {
        return newBlockItemIterator(t)
 }
diff --git a/pkg/index/iterator.go b/pkg/index/iterator.go
index fe94b44..8259d7d 100644
--- a/pkg/index/iterator.go
+++ b/pkg/index/iterator.go
@@ -266,6 +266,10 @@ func (di *delegateIterator) Key() []byte {
        return di.delegated.Key()
 }
 
+func (di *delegateIterator) RawKey() []byte {
+       return di.delegated.RawKey()
+}
+
 func (di *delegateIterator) Field() Field {
        return di.curField
 }
diff --git a/pkg/pb/v1/write.go b/pkg/pb/v1/write.go
index f56a68b..3d22f92 100644
--- a/pkg/pb/v1/write.go
+++ b/pkg/pb/v1/write.go
@@ -32,14 +32,13 @@ import (
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
        "github.com/apache/skywalking-banyandb/pkg/convert"
-       "github.com/apache/skywalking-banyandb/pkg/encoding"
 )
 
 type ID string
 
 const fieldFlagLength = 9
 
-var zeroFieldValue = &modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: 
&modelv1.Int{Value: int64(encoding.Zero)}}}
+var zeroFieldValue = &modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: 
&modelv1.Int{Value: 0}}}
 
 var (
        strDelimiter = []byte("\n")
diff --git a/pkg/query/logical/measure/measure_plan_indexscan_local.go 
b/pkg/query/logical/measure/measure_plan_indexscan_local.go
index d7722d4..3640595 100644
--- a/pkg/query/logical/measure/measure_plan_indexscan_local.go
+++ b/pkg/query/logical/measure/measure_plan_indexscan_local.go
@@ -153,7 +153,7 @@ func (i *localIndexScan) Execute(ec 
executor.MeasureExecutionContext) (executor.
                projectionTagsRefs:   i.projectionTagsRefs,
                projectionFieldsRefs: i.projectionFieldsRefs,
        }
-       if len(iters) == 1 || i.groupByEntity {
+       if i.groupByEntity {
                return newSeriesMIterator(iters, transformContext), nil
        }
        c := logical.CreateComparator(i.Sort)

Reply via email to