This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch time-series
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit cd628eaa55cba84f110d92940e4badfdd3dd3fff
Author: Gao Hongtao <[email protected]>
AuthorDate: Tue Sep 7 11:15:08 2021 +0800

    Add stream query test
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 banyand/kv/badger.go                         |  36 ++--
 banyand/kv/kv.go                             |   3 +-
 banyand/stream/stream_query.go               |  35 +---
 banyand/stream/stream_query_test.go          | 301 +++++++++++++++++++++++++++
 banyand/stream/stream_write.go               |  29 ++-
 banyand/stream/stream_write_test.go          |   2 +-
 banyand/stream/testdata/multiple_shards.json |  64 ++++++
 banyand/stream/testdata/shard0.json          |  18 --
 banyand/stream/testdata/single_series.json   |  51 +++++
 banyand/tsdb/series.go                       |  74 +++++--
 banyand/tsdb/series_seek.go                  |   7 +-
 banyand/tsdb/series_seek_filter.go           |   3 +
 banyand/tsdb/series_seek_sort.go             |  71 ++++---
 banyand/tsdb/series_write.go                 |   5 +-
 banyand/tsdb/seriesdb.go                     |  38 +++-
 banyand/tsdb/seriesdb_test.go                |   2 +-
 banyand/tsdb/shard.go                        |   4 +
 banyand/tsdb/tsdb.go                         |   1 +
 18 files changed, 607 insertions(+), 137 deletions(-)

diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
index 0dfbbe7..131032d 100644
--- a/banyand/kv/badger.go
+++ b/banyand/kv/badger.go
@@ -27,7 +27,9 @@ import (
        "github.com/dgraph-io/badger/v3/y"
        "go.uber.org/multierr"
 
+       "github.com/apache/skywalking-banyandb/api/common"
        modelv2 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/index"
        "github.com/apache/skywalking-banyandb/pkg/index/posting"
        "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
@@ -196,15 +198,11 @@ func (i *iterator) Seek(key []byte) {
 }
 
 func (i *iterator) Key() []byte {
-       return i.delegated.Key()
+       return y.ParseKey(i.delegated.Key())
 }
 
-func (i *iterator) Val() posting.List {
-       list := roaring.NewPostingList()
-       data := make([]byte, len(i.delegated.Value().Value))
-       copy(data, i.delegated.Value().Value)
-       _ = list.Unmarshall(data)
-       return list
+func (i *iterator) Val() []byte {
+       return y.Copy(i.delegated.Value().Value)
 }
 
 func (i *iterator) Valid() bool {
@@ -286,22 +284,30 @@ var _ index.FieldIterator = (*fIterator)(nil)
 type fIterator struct {
        init     bool
        delegate Iterator
+       curr     *index.PostingValue
 }
 
 func (f *fIterator) Next() bool {
-       if f.init {
-               f.delegate.Next()
-       } else {
+       if !f.init {
                f.init = true
+               f.delegate.Rewind()
        }
-       return f.delegate.Valid()
+       if !f.delegate.Valid() {
+               return false
+       }
+       pv := &index.PostingValue{
+               Key:   f.delegate.Key(),
+               Value: 
roaring.NewPostingListWithInitialData(convert.BytesToUint64(f.delegate.Val())),
+       }
+       for ; f.delegate.Valid() && bytes.Equal(pv.Key, f.delegate.Key()); 
f.delegate.Next() {
+               
pv.Value.Insert(common.ItemID(convert.BytesToUint64(f.delegate.Val())))
+       }
+       f.curr = pv
+       return true
 }
 
 func (f *fIterator) Val() *index.PostingValue {
-       return &index.PostingValue{
-               Key:   f.delegate.Key(),
-               Value: f.delegate.Val(),
-       }
+       return f.curr
 }
 
 func (f *fIterator) Close() error {
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index 5cea6ed..4e17456 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -26,7 +26,6 @@ import (
        "github.com/pkg/errors"
 
        "github.com/apache/skywalking-banyandb/pkg/index"
-       "github.com/apache/skywalking-banyandb/pkg/index/posting"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        posting2 "github.com/apache/skywalking-banyandb/pkg/posting"
 )
@@ -109,7 +108,7 @@ type Iterator interface {
        Rewind()
        Seek(key []byte)
        Key() []byte
-       Val() posting.List
+       Val() []byte
        Valid() bool
        Close() error
 }
diff --git a/banyand/stream/stream_query.go b/banyand/stream/stream_query.go
index ac7d038..3402665 100644
--- a/banyand/stream/stream_query.go
+++ b/banyand/stream/stream_query.go
@@ -18,8 +18,6 @@
 package stream
 
 import (
-       "bytes"
-
        "github.com/golang/protobuf/proto"
        "github.com/pkg/errors"
 
@@ -41,37 +39,22 @@ type Query interface {
        Stream(stream *commonv2.Metadata) (Stream, error)
 }
 
-type EqualCondition struct {
-       tag   string
-       value []byte
-}
-
 type Stream interface {
-       Shards(equalConditions []EqualCondition) ([]tsdb.Shard, error)
+       Shards(entity tsdb.Entity) ([]tsdb.Shard, error)
 }
 
 var _ Stream = (*stream)(nil)
 
-func (s *stream) Shards(equalConditions []EqualCondition) ([]tsdb.Shard, 
error) {
-       entityItemLen := len(s.entityIndex)
-       entityItems := make([][]byte, entityItemLen)
-       var entityCount int
-       for _, ec := range equalConditions {
-               fi, ti, tag := s.findTagByName(ec.tag)
-               if tag == nil {
-                       return nil, ErrTagNotExist
-               }
-               for i, eIndex := range s.entityIndex {
-                       if eIndex.family == fi && eIndex.tag == ti {
-                               entityItems[i] = ec.value
-                               entityCount++
-                       }
-               }
-       }
-       if entityCount < entityItemLen {
+func (s *stream) Shards(entity tsdb.Entity) ([]tsdb.Shard, error) {
+       if len(entity) < 1 {
                return s.db.Shards(), nil
        }
-       shardID, err := partition.ShardID(bytes.Join(entityItems, nil), 
s.schema.GetShardNum())
+       for _, e := range entity {
+               if e == nil {
+                       return s.db.Shards(), nil
+               }
+       }
+       shardID, err := partition.ShardID(entity.Marshal(), 
s.schema.GetShardNum())
        if err != nil {
                return nil, err
        }
diff --git a/banyand/stream/stream_query_test.go 
b/banyand/stream/stream_query_test.go
new file mode 100644
index 0000000..8b6cc12
--- /dev/null
+++ b/banyand/stream/stream_query_test.go
@@ -0,0 +1,301 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package stream
+
+import (
+       "bytes"
+       "embed"
+       _ "embed"
+       "encoding/base64"
+       "encoding/json"
+       "fmt"
+       "sort"
+       "strconv"
+       "testing"
+       "time"
+
+       "github.com/golang/protobuf/jsonpb"
+       "github.com/stretchr/testify/assert"
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       modelv2 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+       streamv2 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v2"
+       "github.com/apache/skywalking-banyandb/banyand/tsdb"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/partition"
+)
+
+func Test_Stream_SelectShard(t *testing.T) {
+       tester := assert.New(t)
+       s, deferFunc := setup(tester)
+       defer deferFunc()
+       _ = setupQueryData(tester, "multiple_shards.json", s)
+       tests := []struct {
+               name         string
+               entity       tsdb.Entity
+               wantShardNum int
+               wantErr      bool
+       }{
+               {
+                       name:         "all shards",
+                       wantShardNum: 2,
+               },
+               {
+                       name:         "select a shard",
+                       entity:       tsdb.Entity{tsdb.Entry("webapp_id"), 
tsdb.Entry("10.0.0.1_id"), convert.Int64ToBytes(0)},
+                       wantShardNum: 1,
+               },
+               {
+                       name:         "select shards",
+                       entity:       tsdb.Entity{tsdb.Entry("webapp_id"), 
tsdb.AnyEntry, convert.Int64ToBytes(0)},
+                       wantShardNum: 2,
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       shards, err := s.Shards(tt.entity)
+                       if tt.wantErr {
+                               tester.Error(err)
+                               return
+                       }
+                       tester.NoError(err)
+                       tester.Equal(tt.wantShardNum, len(shards))
+               })
+       }
+
+}
+
+func Test_Stream_Series(t *testing.T) {
+       tester := assert.New(t)
+       s, deferFunc := setup(tester)
+       defer deferFunc()
+       baseTime := setupQueryData(tester, "multiple_shards.json", s)
+       type args struct {
+               entity tsdb.Entity
+       }
+       type shardStruct struct {
+               id       common.ShardID
+               location []string
+               elements []string
+       }
+       type want struct {
+               shards []shardStruct
+       }
+
+       tests := []struct {
+               name    string
+               args    args
+               want    want
+               wantErr bool
+       }{
+               {
+                       name: "all",
+                       args: args{
+                               entity: tsdb.Entity{tsdb.AnyEntry, 
tsdb.AnyEntry, tsdb.AnyEntry},
+                       },
+                       want: want{
+                               shards: []shardStruct{
+                                       {
+                                               id:       0,
+                                               location: 
[]string{"series_12243341348514563931", "data_flow_0"},
+                                               elements: []string{"1"},
+                                       },
+                                       {
+                                               id:       0,
+                                               location: 
[]string{"series_1671844747554927007", "data_flow_0"},
+                                               elements: []string{"2"},
+                                       },
+                                       {
+                                               id:       1,
+                                               location: 
[]string{"series_2374367181827824198", "data_flow_0"},
+                                               elements: []string{"5", "3"},
+                                       },
+                                       {
+                                               id:       1,
+                                               location: 
[]string{"series_8429137420168685297", "data_flow_0"},
+                                               elements: []string{"4"},
+                                       },
+                               },
+                       },
+               },
+               {
+                       name: "find series by service_id and instance_id",
+                       args: args{
+                               entity: tsdb.Entity{tsdb.Entry("webapp_id"), 
tsdb.Entry("10.0.0.1_id"), tsdb.AnyEntry},
+                       },
+                       want: want{
+                               shards: []shardStruct{
+                                       {
+                                               id:       0,
+                                               location: 
[]string{"series_12243341348514563931", "data_flow_0"},
+                                               elements: []string{"1"},
+                                       },
+                                       {
+                                               id:       1,
+                                               location: 
[]string{"series_2374367181827824198", "data_flow_0"},
+                                               elements: []string{"5", "3"},
+                                       },
+                               },
+                       },
+               },
+               {
+                       name: "find a series",
+                       args: args{
+                               entity: tsdb.Entity{tsdb.Entry("webapp_id"), 
tsdb.Entry("10.0.0.1_id"), convert.Uint64ToBytes(1)},
+                       },
+                       want: want{
+                               shards: []shardStruct{
+                                       {
+                                               id:       1,
+                                               location: 
[]string{"series_2374367181827824198", "data_flow_0"},
+                                               elements: []string{"5", "3"},
+                                       },
+                               },
+                       },
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       shards, err := s.Shards(tt.args.entity)
+                       tester.NoError(err)
+                       got := want{
+                               shards: []shardStruct{},
+                       }
+
+                       for _, shard := range shards {
+                               seriesList, err := 
shard.Series().List(tsdb.NewPath(tt.args.entity))
+                               tester.NoError(err)
+                               for _, series := range seriesList {
+                                       func(g *want) {
+                                               sp, err := 
series.Span(tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour))
+                                               defer func(sp tsdb.SeriesSpan) {
+                                                       _ = sp.Close()
+                                               }(sp)
+                                               tester.NoError(err)
+                                               seeker, err := 
sp.SeekerBuilder().Build()
+                                               tester.NoError(err)
+                                               iter, err := seeker.Seek()
+                                               tester.NoError(err)
+                                               for dataFlowID, iterator := 
range iter {
+                                                       var elements []string
+                                                       for iterator.Next() {
+                                                               tagFamily, err 
:= s.ParseTagFamily("searchable", iterator.Val())
+                                                               
tester.NoError(err)
+                                                               for _, tag := 
range tagFamily.GetTags() {
+                                                                       if 
tag.GetKey() == "trace_id" {
+                                                                               
elements = append(elements, tag.GetValue().GetStr().GetValue())
+                                                                       }
+                                                               }
+                                                       }
+                                                       _ = iterator.Close()
+                                                       g.shards = 
append(g.shards, shardStruct{
+                                                               id: shard.ID(),
+                                                               location: 
[]string{
+                                                                       
fmt.Sprintf("series_%v", series.ID()),
+                                                                       
"data_flow_" + strconv.Itoa(dataFlowID),
+                                                               },
+                                                               elements: 
elements,
+                                                       })
+                                               }
+
+                                       }(&got)
+                               }
+                       }
+                       if tt.wantErr {
+                               tester.Error(err)
+                               return
+                       }
+                       tester.NoError(err)
+                       sort.SliceStable(got.shards, func(i, j int) bool {
+                               a := got.shards[i]
+                               b := got.shards[j]
+                               if a.id > b.id {
+                                       return false
+                               }
+                               for i, al := range a.location {
+                                       bl := b.location[i]
+                                       if bytes.Compare([]byte(al), 
[]byte(bl)) > 0 {
+                                               return false
+                                       }
+                               }
+                               return true
+                       })
+                       tester.Equal(tt.want, got)
+               })
+       }
+
+}
+
+//go:embed testdata/*.json
+var dataFS embed.FS
+
+func setupQueryData(t *assert.Assertions, dataFile string, stream *stream) 
(baseTime time.Time) {
+       var templates []interface{}
+       baseTime = time.Now()
+       content, err := dataFS.ReadFile("testdata/" + dataFile)
+       t.NoError(err)
+       t.NoError(json.Unmarshal(content, &templates))
+       bb, _ := base64.StdEncoding.DecodeString("YWJjMTIzIT8kKiYoKSctPUB+")
+       for i, template := range templates {
+               rawSearchTagFamily, err := json.Marshal(template)
+               t.NoError(err)
+               searchTagFamily := &streamv2.ElementValue_TagFamily{}
+               t.NoError(jsonpb.UnmarshalString(string(rawSearchTagFamily), 
searchTagFamily))
+               e := &streamv2.ElementValue{
+                       ElementId: strconv.Itoa(i),
+                       Timestamp: 
timestamppb.New(baseTime.Add(time.Millisecond * time.Duration(i))),
+                       TagFamilies: []*streamv2.ElementValue_TagFamily{
+                               {
+                                       Tags: []*modelv2.TagValue{
+                                               {
+                                                       Value: 
&modelv2.TagValue_BinaryData{
+                                                               BinaryData: bb,
+                                                       },
+                                               },
+                                       },
+                               },
+                       },
+               }
+               e.TagFamilies = append(e.TagFamilies, searchTagFamily)
+               entity, err := stream.buildEntity(e)
+               t.NoError(err)
+               shardID, err := partition.ShardID(entity.Marshal(), 
stream.schema.GetShardNum())
+               t.NoError(err)
+               itemID, err := stream.write(common.ShardID(shardID), e)
+               t.NoError(err)
+               sa, err := stream.Shards(entity)
+               for _, shard := range sa {
+                       se, err := shard.Series().Get(entity)
+                       t.NoError(err)
+                       for {
+                               item, closer, _ := se.Get(*itemID)
+                               rawTagFamily, _ := item.Val("searchable")
+                               if len(rawTagFamily) > 0 {
+                                       _ = closer.Close()
+                                       break
+                               }
+                               _ = closer.Close()
+                       }
+
+               }
+       }
+       return baseTime
+}
diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go
index 2bcd0bf..fa80dc3 100644
--- a/banyand/stream/stream_write.go
+++ b/banyand/stream/stream_write.go
@@ -40,34 +40,34 @@ var (
        ErrUnsupportedTagForIndexField = errors.New("the tag type(for example, 
null) can not be as the index field value")
 )
 
-func (s *stream) write(shardID common.ShardID, value *streamv2.ElementValue) 
error {
+func (s *stream) write(shardID common.ShardID, value *streamv2.ElementValue) 
(*tsdb.GlobalItemID, error) {
        sm := s.schema
        fLen := len(value.GetTagFamilies())
        if fLen < 1 {
-               return errors.Wrap(ErrMalformedElement, "no tag family")
+               return nil, errors.Wrap(ErrMalformedElement, "no tag family")
        }
        if fLen > len(sm.TagFamilies) {
-               return errors.Wrap(ErrMalformedElement, "tag family number is 
more than expected")
+               return nil, errors.Wrap(ErrMalformedElement, "tag family number 
is more than expected")
        }
        shard, err := s.db.Shard(shardID)
        if err != nil {
-               return err
+               return nil, err
        }
        entity, err := s.buildEntity(value)
        if err != nil {
-               return err
+               return nil, err
        }
        series, err := shard.Series().Get(entity)
        if err != nil {
-               return err
+               return nil, err
        }
        t := value.GetTimestamp().AsTime()
-       wp, err := series.Span(tsdb.NewTimeRange(t, 0))
+       wp, err := series.Span(tsdb.NewTimeRangeDuration(t, 0))
        if err != nil {
                if wp != nil {
                        _ = wp.Close()
                }
-               return err
+               return nil, err
        }
        writeFn := func() (tsdb.Writer, error) {
                builder := wp.WriterBuilder().Time(t)
@@ -97,12 +97,18 @@ func (s *stream) write(shardID common.ShardID, value 
*streamv2.ElementValue) err
                        return nil, errWrite
                }
                _, errWrite = writer.Write()
+               s.l.Debug().
+                       Time("ts", t).
+                       Int("ts_nano", t.Nanosecond()).
+                       Interface("data", value).
+                       Uint64("series_id", uint64(series.ID())).
+                       Msg("write stream")
                return writer, errWrite
        }
        writer, err := writeFn()
        if err != nil {
                _ = wp.Close()
-               return err
+               return nil, err
        }
        m := indexMessage{
                localWriter: writer,
@@ -117,7 +123,8 @@ func (s *stream) write(shardID common.ShardID, value 
*streamv2.ElementValue) err
                }()
                s.indexCh <- m
        }(m)
-       return err
+       itemID := writer.ItemID()
+       return &itemID, err
 }
 
 func getIndexValue(ruleIndex indexRule, value *streamv2.ElementValue) (val 
[]byte, err error) {
@@ -204,7 +211,7 @@ func (w *writeCallback) Rev(message bus.Message) (resp 
bus.Message) {
        }
        sm := writeEvent.WriteRequest.GetMetadata()
        id := formatStreamID(sm.GetName(), sm.GetGroup())
-       err := w.schemaMap[id].write(common.ShardID(writeEvent.ShardID), 
writeEvent.WriteRequest.GetElement())
+       _, err := w.schemaMap[id].write(common.ShardID(writeEvent.ShardID), 
writeEvent.WriteRequest.GetElement())
        if err != nil {
                w.l.Debug().Err(err)
        }
diff --git a/banyand/stream/stream_write_test.go 
b/banyand/stream/stream_write_test.go
index 1853a7f..0ffe0d5 100644
--- a/banyand/stream/stream_write_test.go
+++ b/banyand/stream/stream_write_test.go
@@ -197,7 +197,7 @@ func Test_Stream_Write(t *testing.T) {
 
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
-                       err := s.write(common.ShardID(tt.args.shardID), 
tt.args.ele)
+                       _, err := s.write(common.ShardID(tt.args.shardID), 
tt.args.ele)
                        if tt.wantErr {
                                tester.Error(err)
                                return
diff --git a/banyand/stream/testdata/multiple_shards.json 
b/banyand/stream/testdata/multiple_shards.json
new file mode 100644
index 0000000..791c7c9
--- /dev/null
+++ b/banyand/stream/testdata/multiple_shards.json
@@ -0,0 +1,64 @@
+[
+  {
+    "tags": [
+      {"str":{"value": "1"}},
+      {"int":{"value": 0}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.1_id"}},
+      {"str":{"value": "/home_id"}},
+      {"int":{"value": 1000}},
+      {"int":{"value": 1622933202000000000}}
+    ]
+  },
+  {
+    "tags": [
+      {"str":{"value": "2"}},
+      {"int":{"value": 0}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.3_id"}},
+      {"str":{"value": "/home_id"}},
+      {"int":{"value": 500}},
+      {"int":{"value": 1622933202000000000}}
+    ]
+  },
+  {
+    "tags": [
+      {"str":{"value": "3"}},
+      {"int":{"value": 1}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.1_id"}},
+      {"str":{"value": "/home_id"}},
+      {"int":{"value": 30}},
+      {"int":{"value": 1622933202000000000}},
+      {"str":{"value": "GET"}},
+      {"str":{"value": "500"}}
+    ]
+  },
+  {
+    "tags": [
+      {"str":{"value": "4"}},
+      {"int":{"value": 1}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.5_id"}},
+      {"str":{"value": "/home_id"}},
+      {"int":{"value": 60}},
+      {"int":{"value": 1622933202000000000}},
+      {"str":{"value": "GET"}},
+      {"str":{"value": "400"}}
+    ]
+  },
+  {
+    "tags": [
+      {"str":{"value": "5"}},
+      {"int":{"value": 1}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.1_id"}},
+      {"str":{"value": "/home_id"}},
+      {"int":{"value": 300}},
+      {"int":{"value": 1622933202000000000}},
+      {"str":{"value": "GET"}},
+      {"str":{"value": "500"}}
+    ]
+  }
+  
+]
\ No newline at end of file
diff --git a/banyand/stream/testdata/shard0.json 
b/banyand/stream/testdata/shard0.json
deleted file mode 100644
index 28de2f3..0000000
--- a/banyand/stream/testdata/shard0.json
+++ /dev/null
@@ -1,18 +0,0 @@
-[
-  {
-    "element_id": "1",
-    "timestamp": "2021-04-15T01:30:15.01Z",
-    "tag_families": [
-      {
-        "tags": [
-          {"binary_data": "YWJjMTIzIT8kKiYoKSctPUB+"}
-        ]
-      },
-      {
-        "tags": [
-          {"str": ""}
-        ]
-      }
-    ]
-  }
-]
\ No newline at end of file
diff --git a/banyand/stream/testdata/single_series.json 
b/banyand/stream/testdata/single_series.json
new file mode 100644
index 0000000..049ec0c
--- /dev/null
+++ b/banyand/stream/testdata/single_series.json
@@ -0,0 +1,51 @@
+[
+  {
+    "tags": [
+      {"str":{"value": "trace_id-xxfff.111323"}},
+      {"int":{"value": 0}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.1_id"}},
+      {"str":{"value": "/home_id"}},
+      {"int":{"value": 1000}},
+      {"int":{"value": 1622933202000000000}}
+    ]
+  },
+  {
+    "tags": [
+      {"str":{"value": "trace_id-xxfff.111323"}},
+      {"int":{"value": 0}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.1_id"}},
+      {"str":{"value": "/home_id"}},
+      {"int":{"value": 500}},
+      {"int":{"value": 1622933202000000000}}
+    ]
+  },
+  {
+    "tags": [
+      {"str":{"value": "trace_id-xxfff.111323"}},
+      {"int":{"value": 0}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.1_id"}},
+      {"str":{"value": "/home_id"}},
+      {"int":{"value": 30}},
+      {"int":{"value": 1622933202000000000}},
+      {"str":{"value": "GET"}},
+      {"str":{"value": "200"}}
+    ]
+  },
+  {
+    "tags": [
+      {"str":{"value": "trace_id-xxfff.111323"}},
+      {"int":{"value": 1}},
+      {"str":{"value": "httpserver_id"}},
+      {"str":{"value": "10.0.0.1_id"}},
+      {"str":{"value": "/home_id"}},
+      {"int":{"value": 300}},
+      {"int":{"value": 1622933202000000000}},
+      {"str":{"value": "GET"}},
+      {"str":{"value": "500"}}
+    ]
+  }
+  
+]
\ No newline at end of file
diff --git a/banyand/tsdb/series.go b/banyand/tsdb/series.go
index efdf8ba..376e39c 100644
--- a/banyand/tsdb/series.go
+++ b/banyand/tsdb/series.go
@@ -19,6 +19,7 @@ package tsdb
 
 import (
        "bytes"
+       "context"
        "io"
        "time"
 
@@ -27,6 +28,7 @@ import (
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
 var (
@@ -70,9 +72,8 @@ func (i *GlobalItemID) UnMarshal(data []byte) error {
 }
 
 type TimeRange struct {
-       Start    time.Time
-       Duration time.Duration
-       End      time.Time
+       Start time.Time
+       End   time.Time
 }
 
 func (t TimeRange) contains(unixNano uint64) bool {
@@ -83,18 +84,24 @@ func (t TimeRange) contains(unixNano uint64) bool {
        return tp.Equal(t.Start) || tp.After(t.Start)
 }
 
-func NewTimeRange(Start time.Time, Duration time.Duration) TimeRange {
+func NewTimeRange(Start, End time.Time) TimeRange {
        return TimeRange{
-               Start:    Start,
-               Duration: Duration,
-               End:      Start.Add(Duration),
+               Start: Start,
+               End:   End,
+       }
+}
+
+func NewTimeRangeDuration(Start time.Time, Duration time.Duration) TimeRange {
+       return TimeRange{
+               Start: Start,
+               End:   Start.Add(Duration),
        }
 }
 
 type Series interface {
        ID() common.SeriesID
        Span(timeRange TimeRange) (SeriesSpan, error)
-       Get(id GlobalItemID) (Item, error)
+       Get(id GlobalItemID) (Item, io.Closer, error)
 }
 
 type SeriesSpan interface {
@@ -109,18 +116,16 @@ type series struct {
        id      common.SeriesID
        blockDB blockDatabase
        shardID common.ShardID
+       l       *logger.Logger
 }
 
-func (s *series) Get(id GlobalItemID) (Item, error) {
-       panic("implement me")
-}
-
-func newSeries(id common.SeriesID, blockDB blockDatabase) *series {
-       return &series{
-               id:      id,
-               blockDB: blockDB,
-               shardID: blockDB.shardID(),
-       }
+func (s *series) Get(id GlobalItemID) (Item, io.Closer, error) {
+       b := s.blockDB.block(id)
+       return &item{
+               data:     b.dataReader(),
+               itemID:   id.id,
+               seriesID: s.id,
+       }, b, nil
 }
 
 func (s *series) ID() common.SeriesID {
@@ -132,7 +137,25 @@ func (s *series) Span(timeRange TimeRange) (SeriesSpan, 
error) {
        if len(blocks) < 1 {
                return nil, ErrEmptySeriesSpan
        }
-       return newSeriesSpan(timeRange, blocks, s.id, s.shardID), nil
+       s.l.Debug().
+               Times("time_range", []time.Time{timeRange.Start, 
timeRange.End}).
+               Msg("select series span")
+       return newSeriesSpan(context.WithValue(context.Background(), 
logger.ContextKey, s.l), timeRange, blocks, s.id, s.shardID), nil
+}
+
+func newSeries(ctx context.Context, id common.SeriesID, blockDB blockDatabase) 
*series {
+       s := &series{
+               id:      id,
+               blockDB: blockDB,
+               shardID: blockDB.shardID(),
+       }
+       parentLogger := ctx.Value(logger.ContextKey)
+       if pl, ok := parentLogger.(*logger.Logger); ok {
+               s.l = pl.Named("series")
+       } else {
+               s.l = logger.GetLogger("series")
+       }
+       return s
 }
 
 var _ SeriesSpan = (*seriesSpan)(nil)
@@ -142,6 +165,7 @@ type seriesSpan struct {
        seriesID  common.SeriesID
        shardID   common.ShardID
        timeRange TimeRange
+       l         *logger.Logger
 }
 
 func (s *seriesSpan) Close() (err error) {
@@ -159,12 +183,18 @@ func (s *seriesSpan) SeekerBuilder() SeekerBuilder {
        return newSeekerBuilder(s)
 }
 
-func newSeriesSpan(timeRange TimeRange, blocks []blockDelegate,
-       id common.SeriesID, shardID common.ShardID) *seriesSpan {
-       return &seriesSpan{
+func newSeriesSpan(ctx context.Context, timeRange TimeRange, blocks 
[]blockDelegate, id common.SeriesID, shardID common.ShardID) *seriesSpan {
+       s := &seriesSpan{
                blocks:    blocks,
                seriesID:  id,
                shardID:   shardID,
                timeRange: timeRange,
        }
+       parentLogger := ctx.Value(logger.ContextKey)
+       if pl, ok := parentLogger.(*logger.Logger); ok {
+               s.l = pl.Named("series_span")
+       } else {
+               s.l = logger.GetLogger("series_span")
+       }
+       return s
 }
diff --git a/banyand/tsdb/series_seek.go b/banyand/tsdb/series_seek.go
index 478efa0..1c4f338 100644
--- a/banyand/tsdb/series_seek.go
+++ b/banyand/tsdb/series_seek.go
@@ -63,16 +63,21 @@ type seekerBuilder struct {
 }
 
 func (s *seekerBuilder) Build() (Seeker, error) {
+       if s.order == modelv2.QueryOrder_SORT_UNSPECIFIED {
+               s.order = modelv2.QueryOrder_SORT_DESC
+       }
        indexFilter, err := s.buildIndexFilter()
        if err != nil {
                return nil, err
        }
        filters := []filterFn{
-               indexFilter,
                func(item Item) bool {
                        return s.seriesSpan.timeRange.contains(item.Time())
                },
        }
+       if indexFilter != nil {
+               filters = append(filters, indexFilter)
+       }
        return newSeeker(s.buildSeries(filters)), nil
 }
 
diff --git a/banyand/tsdb/series_seek_filter.go 
b/banyand/tsdb/series_seek_filter.go
index 65147ea..8814a16 100644
--- a/banyand/tsdb/series_seek_filter.go
+++ b/banyand/tsdb/series_seek_filter.go
@@ -44,6 +44,9 @@ func (s *seekerBuilder) Filter(indexRule 
*databasev2.IndexRule, condition Condit
 }
 
 func (s *seekerBuilder) buildIndexFilter() (filterFn, error) {
+       if len(s.conditions) < 1 {
+               return nil, nil
+       }
        var treeIndexCondition, invertedIndexCondition []index.Condition
        for _, condition := range s.conditions {
                if len(condition.condition) > 1 {
diff --git a/banyand/tsdb/series_seek_sort.go b/banyand/tsdb/series_seek_sort.go
index 1646bce..3b26a8c 100644
--- a/banyand/tsdb/series_seek_sort.go
+++ b/banyand/tsdb/series_seek_sort.go
@@ -19,6 +19,9 @@ package tsdb
 
 import (
        "sort"
+       "time"
+
+       "go.uber.org/multierr"
 
        "github.com/apache/skywalking-banyandb/api/common"
        databasev2 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v2"
@@ -26,6 +29,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/kv"
        "github.com/apache/skywalking-banyandb/pkg/index"
        "github.com/apache/skywalking-banyandb/pkg/index/posting"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
 func (s *seekerBuilder) OrderByIndex(indexRule *databasev2.IndexRule, order 
modelv2.QueryOrder_Sort) SeekerBuilder {
@@ -42,29 +46,20 @@ func (s *seekerBuilder) OrderByTime(order 
modelv2.QueryOrder_Sort) SeekerBuilder
 
 func (s *seekerBuilder) buildSeries(filters []filterFn) []Iterator {
        if s.indexRuleForSorting == nil {
-               return s.buildSeriesByIndex(filters)
+               return s.buildSeriesByTime(filters)
        }
-       return s.buildSeriesByTime(filters)
+       return s.buildSeriesByIndex(filters)
 }
 
 func (s *seekerBuilder) buildSeriesByIndex(filters []filterFn) (series 
[]Iterator) {
        for _, b := range s.seriesSpan.blocks {
                switch s.indexRuleForSorting.GetType() {
                case databasev2.IndexRule_TYPE_TREE:
-                       series = append(series, newSearcherIterator(
-                               b.lsmIndexReader().
-                                       
FieldIterator([]byte(s.indexRuleForSorting.GetMetadata().GetName()), s.order),
-                               b.dataReader(),
-                               s.seriesSpan.seriesID,
-                               filters,
-                       ))
+                       series = append(series, 
newSearcherIterator(s.seriesSpan.l, b.lsmIndexReader().
+                               
FieldIterator([]byte(s.indexRuleForSorting.GetMetadata().GetName()), s.order), 
b.dataReader(), s.seriesSpan.seriesID, filters))
                case databasev2.IndexRule_TYPE_INVERTED:
-                       series = append(series, 
newSearcherIterator(b.invertedIndexReader().
-                               
FieldIterator([]byte(s.indexRuleForSorting.GetMetadata().GetName()), s.order),
-                               b.dataReader(),
-                               s.seriesSpan.seriesID,
-                               filters,
-                       ))
+                       series = append(series, 
newSearcherIterator(s.seriesSpan.l, b.invertedIndexReader().
+                               
FieldIterator([]byte(s.indexRuleForSorting.GetMetadata().GetName()), s.order), 
b.dataReader(), s.seriesSpan.seriesID, filters))
                }
        }
        return
@@ -83,15 +78,23 @@ func (s *seekerBuilder) buildSeriesByTime(filters 
[]filterFn) []Iterator {
                        return bb[i].startTime().After(bb[j].startTime())
                })
        }
-       delegated := make([]Iterator, len(bb))
+       delegated := make([]Iterator, 0, len(bb))
+       var bTimes []time.Time
        for _, b := range bb {
-               delegated = append(delegated, newSearcherIterator(b.
-                       primaryIndexReader().
-                       FieldIterator(
-                               s.seriesSpan.seriesID.Marshal(),
-                               s.order,
-                       ), b.dataReader(), s.seriesSpan.seriesID, filters))
+               bTimes = append(bTimes, b.startTime())
+               delegated = append(delegated, newSearcherIterator(
+                       s.seriesSpan.l,
+                       b.primaryIndexReader().
+                               FieldIterator(
+                                       s.seriesSpan.seriesID.Marshal(),
+                                       s.order,
+                               ), b.dataReader(), s.seriesSpan.seriesID, 
filters))
        }
+       s.seriesSpan.l.Debug().
+               Str("order", modelv2.QueryOrder_Sort_name[int32(s.order)]).
+               Times("blocks", bTimes).
+               Uint64("series_id", uint64(s.seriesSpan.seriesID)).
+               Msg("seek series by time")
        return []Iterator{newMergedIterator(delegated)}
 }
 
@@ -104,6 +107,7 @@ type searcherIterator struct {
        data          kv.TimeSeriesReader
        seriesID      common.SeriesID
        filters       []filterFn
+       l             *logger.Logger
 }
 
 func (s *searcherIterator) Next() bool {
@@ -112,20 +116,22 @@ func (s *searcherIterator) Next() bool {
                        v := s.fieldIterator.Val()
                        s.cur = v.Value.Iterator()
                        s.curKey = v.Key
+                       s.l.Trace().Hex("term_field", s.curKey).Msg("got a new 
field")
                } else {
-                       _ = s.Close()
                        return false
                }
        }
        if s.cur.Next() {
+
                for _, filter := range s.filters {
                        if !filter(s.Val()) {
+                               s.l.Trace().Uint64("item_id", 
uint64(s.Val().ID())).Msg("ignore the item")
                                return s.Next()
                        }
                }
+               s.l.Trace().Uint64("item_id", uint64(s.Val().ID())).Msg("got an 
item")
                return true
        }
-       _ = s.cur.Close()
        s.cur = nil
        return s.Next()
 }
@@ -143,12 +149,14 @@ func (s *searcherIterator) Close() error {
        return s.fieldIterator.Close()
 }
 
-func newSearcherIterator(fieldIterator index.FieldIterator, data 
kv.TimeSeriesReader, seriesID common.SeriesID, filters []filterFn) Iterator {
+func newSearcherIterator(l *logger.Logger, fieldIterator index.FieldIterator, 
data kv.TimeSeriesReader,
+       seriesID common.SeriesID, filters []filterFn) Iterator {
        return &searcherIterator{
                fieldIterator: fieldIterator,
                data:          data,
                seriesID:      seriesID,
                filters:       filters,
+               l:             l,
        }
 }
 
@@ -158,17 +166,12 @@ type mergedIterator struct {
        curr      Iterator
        index     int
        delegated []Iterator
-       closed    bool
 }
 
 func (m *mergedIterator) Next() bool {
-       if m.closed {
-               return false
-       }
        if m.curr == nil {
                m.index++
                if m.index >= len(m.delegated) {
-                       _ = m.Close()
                        return false
                } else {
                        m.curr = m.delegated[m.index]
@@ -176,7 +179,6 @@ func (m *mergedIterator) Next() bool {
        }
        hasNext := m.curr.Next()
        if !hasNext {
-               _ = m.curr.Close()
                m.curr = nil
                return m.Next()
        }
@@ -188,8 +190,11 @@ func (m *mergedIterator) Val() Item {
 }
 
 func (m *mergedIterator) Close() error {
-       m.closed = true
-       return nil
+       var err error
+       for _, d := range m.delegated {
+               err = multierr.Append(err, d.Close())
+       }
+       return err
 }
 
 func newMergedIterator(delegated []Iterator) Iterator {
diff --git a/banyand/tsdb/series_write.go b/banyand/tsdb/series_write.go
index 0f69332..af59b53 100644
--- a/banyand/tsdb/series_write.go
+++ b/banyand/tsdb/series_write.go
@@ -159,9 +159,10 @@ func (d dataBucket) marshal() []byte {
        }, nil)
 }
 
-func (w *writer) Write() (id GlobalItemID, err error) {
+func (w *writer) Write() (GlobalItemID, error) {
+       id := w.ItemID()
        for _, c := range w.columns {
-               err = w.block.write(dataBucket{
+               err := w.block.write(dataBucket{
                        seriesID: w.itemID.seriesID,
                        family:   c.family,
                }.marshal(),
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index 24a622f..ccae717 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -41,6 +41,14 @@ type Entry []byte
 
 type Entity []Entry
 
+func (e Entity) Marshal() []byte {
+       data := make([][]byte, len(e))
+       for i, entry := range e {
+               data[i] = entry
+       }
+       return bytes.Join(data, nil)
+}
+
 type Path struct {
        prefix   []byte
        mask     []byte
@@ -86,6 +94,7 @@ type SeriesDatabase interface {
 type blockDatabase interface {
        shardID() common.ShardID
        span(timeRange TimeRange) []blockDelegate
+       block(id GlobalItemID) blockDelegate
 }
 
 var _ SeriesDatabase = (*seriesDB)(nil)
@@ -100,6 +109,10 @@ type seriesDB struct {
        sID            common.ShardID
 }
 
+func (s *seriesDB) block(id GlobalItemID) blockDelegate {
+       return s.lst[id.segID].lst[id.blockID].delegate()
+}
+
 func (s *seriesDB) shardID() common.ShardID {
        return s.sID
 }
@@ -111,7 +124,7 @@ func (s *seriesDB) Get(entity Entity) (Series, error) {
                return nil, err
        }
        if err == nil {
-               return newSeries(bytesConvSeriesID(seriesID), s), nil
+               return newSeries(s.context(), bytesConvSeriesID(seriesID), s), 
nil
        }
        s.Lock()
        defer s.Unlock()
@@ -120,7 +133,7 @@ func (s *seriesDB) Get(entity Entity) (Series, error) {
        if err != nil {
                return nil, err
        }
-       return newSeries(bytesConvSeriesID(seriesID), s), nil
+       return newSeries(s.context(), bytesConvSeriesID(seriesID), s), nil
 }
 
 func (s *seriesDB) List(path Path) (SeriesList, error) {
@@ -130,8 +143,14 @@ func (s *seriesDB) List(path Path) (SeriesList, error) {
                        return nil, err
                }
                if err == nil {
-                       return []Series{newSeries(bytesConvSeriesID(id), s)}, 
nil
+                       seriesID := bytesConvSeriesID(id)
+                       s.l.Debug().
+                               Hex("path", path.prefix).
+                               Uint64("series_id", uint64(seriesID)).
+                               Msg("got a series")
+                       return []Series{newSeries(s.context(), seriesID, s)}, 
nil
                }
+               s.l.Debug().Hex("path", path.prefix).Msg("doesn't get any 
series")
                return nil, nil
        }
        result := make([]Series, 0)
@@ -147,7 +166,12 @@ func (s *seriesDB) List(path Path) (SeriesList, error) {
                                err = multierr.Append(err, errGetVal)
                                return nil
                        }
-                       result = append(result, 
newSeries(common.SeriesID(convert.BytesToUint64(id)), s))
+                       seriesID := bytesConvSeriesID(id)
+                       s.l.Debug().
+                               Hex("path", path.prefix).
+                               Uint64("series_id", uint64(seriesID)).
+                               Msg("got a series")
+                       result = append(result, newSeries(s.context(), 
seriesID, s))
                }
                return nil
        })
@@ -166,6 +190,10 @@ func (s *seriesDB) span(_ TimeRange) []blockDelegate {
        return result
 }
 
+func (s *seriesDB) context() context.Context {
+       return context.WithValue(context.Background(), logger.ContextKey, s.l)
+}
+
 func (s *seriesDB) Close() error {
        for _, seg := range s.lst {
                seg.close()
@@ -183,7 +211,7 @@ func newSeriesDataBase(ctx context.Context, shardID 
common.ShardID, path string,
                return nil, logger.ErrNoLoggerInContext
        }
        if pl, ok := parentLogger.(*logger.Logger); ok {
-               sdb.l = pl.Named("seriesSpan")
+               sdb.l = pl.Named("series_database")
        }
        var err error
        sdb.seriesMetadata, err = kv.OpenStore(0, path+"/md", 
kv.StoreWithNamedLogger("metadata", sdb.l))
diff --git a/banyand/tsdb/seriesdb_test.go b/banyand/tsdb/seriesdb_test.go
index 3968564..a87d24c 100644
--- a/banyand/tsdb/seriesdb_test.go
+++ b/banyand/tsdb/seriesdb_test.go
@@ -338,7 +338,7 @@ func setUpEntities(t *assert.Assertions, db SeriesDatabase) 
[]*entityWithID {
 }
 
 func newMockSeries(id common.SeriesID) *series {
-       return newSeries(id, nil)
+       return newSeries(nil, id, nil)
 }
 
 func transform(list SeriesList) (seriesIDs []common.SeriesID) {
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index 3b2ed4e..4bcc40e 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -37,6 +37,10 @@ type shard struct {
        lst            []*segment
 }
 
+func (s *shard) ID() common.ShardID {
+       return s.id
+}
+
 func (s *shard) Series() SeriesDatabase {
        return s.seriesDatabase
 }
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index 0d3da45..4528b46 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -60,6 +60,7 @@ type Database interface {
 
 type Shard interface {
        io.Closer
+       ID() common.ShardID
        Series() SeriesDatabase
        Index() IndexDatabase
 }

Reply via email to