This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch time-series in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit cd628eaa55cba84f110d92940e4badfdd3dd3fff Author: Gao Hongtao <[email protected]> AuthorDate: Tue Sep 7 11:15:08 2021 +0800 Add stream query test Signed-off-by: Gao Hongtao <[email protected]> --- banyand/kv/badger.go | 36 ++-- banyand/kv/kv.go | 3 +- banyand/stream/stream_query.go | 35 +--- banyand/stream/stream_query_test.go | 301 +++++++++++++++++++++++++++ banyand/stream/stream_write.go | 29 ++- banyand/stream/stream_write_test.go | 2 +- banyand/stream/testdata/multiple_shards.json | 64 ++++++ banyand/stream/testdata/shard0.json | 18 -- banyand/stream/testdata/single_series.json | 51 +++++ banyand/tsdb/series.go | 74 +++++-- banyand/tsdb/series_seek.go | 7 +- banyand/tsdb/series_seek_filter.go | 3 + banyand/tsdb/series_seek_sort.go | 71 ++++--- banyand/tsdb/series_write.go | 5 +- banyand/tsdb/seriesdb.go | 38 +++- banyand/tsdb/seriesdb_test.go | 2 +- banyand/tsdb/shard.go | 4 + banyand/tsdb/tsdb.go | 1 + 18 files changed, 607 insertions(+), 137 deletions(-) diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go index 0dfbbe7..131032d 100644 --- a/banyand/kv/badger.go +++ b/banyand/kv/badger.go @@ -27,7 +27,9 @@ import ( "github.com/dgraph-io/badger/v3/y" "go.uber.org/multierr" + "github.com/apache/skywalking-banyandb/api/common" modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2" + "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/index/posting" "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring" @@ -196,15 +198,11 @@ func (i *iterator) Seek(key []byte) { } func (i *iterator) Key() []byte { - return i.delegated.Key() + return y.ParseKey(i.delegated.Key()) } -func (i *iterator) Val() posting.List { - list := roaring.NewPostingList() - data := make([]byte, len(i.delegated.Value().Value)) - copy(data, i.delegated.Value().Value) - _ = list.Unmarshall(data) - return list +func (i *iterator) Val() []byte { + return y.Copy(i.delegated.Value().Value) } func (i *iterator) Valid() bool { @@ -286,22 +284,30 @@ var _ index.FieldIterator = (*fIterator)(nil) type fIterator struct { init bool delegate Iterator + curr *index.PostingValue } func (f *fIterator) Next() bool { - if f.init { - f.delegate.Next() - } else { + if !f.init { f.init = true + f.delegate.Rewind() } - return f.delegate.Valid() + if !f.delegate.Valid() { + return false + } + pv := &index.PostingValue{ + Key: f.delegate.Key(), + Value: roaring.NewPostingListWithInitialData(convert.BytesToUint64(f.delegate.Val())), + } + for ; f.delegate.Valid() && bytes.Equal(pv.Key, f.delegate.Key()); f.delegate.Next() { + pv.Value.Insert(common.ItemID(convert.BytesToUint64(f.delegate.Val()))) + } + f.curr = pv + return true } func (f *fIterator) Val() *index.PostingValue { - return &index.PostingValue{ - Key: f.delegate.Key(), - Value: f.delegate.Val(), - } + return f.curr } func (f *fIterator) Close() error { diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go index 5cea6ed..4e17456 100644 --- a/banyand/kv/kv.go +++ b/banyand/kv/kv.go @@ -26,7 +26,6 @@ import ( "github.com/pkg/errors" "github.com/apache/skywalking-banyandb/pkg/index" - "github.com/apache/skywalking-banyandb/pkg/index/posting" "github.com/apache/skywalking-banyandb/pkg/logger" posting2 "github.com/apache/skywalking-banyandb/pkg/posting" ) @@ -109,7 +108,7 @@ type Iterator interface { Rewind() Seek(key []byte) Key() []byte - Val() posting.List + Val() []byte Valid() bool Close() error } diff --git a/banyand/stream/stream_query.go b/banyand/stream/stream_query.go index ac7d038..3402665 100644 --- a/banyand/stream/stream_query.go +++ b/banyand/stream/stream_query.go @@ -18,8 +18,6 @@ package stream import ( - "bytes" - "github.com/golang/protobuf/proto" "github.com/pkg/errors" @@ -41,37 +39,22 @@ type Query interface { Stream(stream *commonv2.Metadata) (Stream, error) } -type EqualCondition struct { - tag string - value []byte -} - type Stream interface { - Shards(equalConditions []EqualCondition) ([]tsdb.Shard, error) + Shards(entity tsdb.Entity) ([]tsdb.Shard, error) } var _ Stream = (*stream)(nil) -func (s *stream) Shards(equalConditions []EqualCondition) ([]tsdb.Shard, error) { - entityItemLen := len(s.entityIndex) - entityItems := make([][]byte, entityItemLen) - var entityCount int - for _, ec := range equalConditions { - fi, ti, tag := s.findTagByName(ec.tag) - if tag == nil { - return nil, ErrTagNotExist - } - for i, eIndex := range s.entityIndex { - if eIndex.family == fi && eIndex.tag == ti { - entityItems[i] = ec.value - entityCount++ - } - } - } - if entityCount < entityItemLen { +func (s *stream) Shards(entity tsdb.Entity) ([]tsdb.Shard, error) { + if len(entity) < 1 { return s.db.Shards(), nil } - shardID, err := partition.ShardID(bytes.Join(entityItems, nil), s.schema.GetShardNum()) + for _, e := range entity { + if e == nil { + return s.db.Shards(), nil + } + } + shardID, err := partition.ShardID(entity.Marshal(), s.schema.GetShardNum()) if err != nil { return nil, err } diff --git a/banyand/stream/stream_query_test.go b/banyand/stream/stream_query_test.go new file mode 100644 index 0000000..8b6cc12 --- /dev/null +++ b/banyand/stream/stream_query_test.go @@ -0,0 +1,301 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package stream + +import ( + "bytes" + "embed" + _ "embed" + "encoding/base64" + "encoding/json" + "fmt" + "sort" + "strconv" + "testing" + "time" + + "github.com/golang/protobuf/jsonpb" + "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/apache/skywalking-banyandb/api/common" + modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2" + streamv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v2" + "github.com/apache/skywalking-banyandb/banyand/tsdb" + "github.com/apache/skywalking-banyandb/pkg/convert" + "github.com/apache/skywalking-banyandb/pkg/partition" +) + +func Test_Stream_SelectShard(t *testing.T) { + tester := assert.New(t) + s, deferFunc := setup(tester) + defer deferFunc() + _ = setupQueryData(tester, "multiple_shards.json", s) + tests := []struct { + name string + entity tsdb.Entity + wantShardNum int + wantErr bool + }{ + { + name: "all shards", + wantShardNum: 2, + }, + { + name: "select a shard", + entity: tsdb.Entity{tsdb.Entry("webapp_id"), tsdb.Entry("10.0.0.1_id"), convert.Int64ToBytes(0)}, + wantShardNum: 1, + }, + { + name: "select shards", + entity: tsdb.Entity{tsdb.Entry("webapp_id"), tsdb.AnyEntry, convert.Int64ToBytes(0)}, + wantShardNum: 2, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + shards, err := s.Shards(tt.entity) + if tt.wantErr { + tester.Error(err) + return + } + tester.NoError(err) + tester.Equal(tt.wantShardNum, len(shards)) + }) + } + +} + +func Test_Stream_Series(t *testing.T) { + tester := assert.New(t) + s, deferFunc := setup(tester) + defer deferFunc() + baseTime := setupQueryData(tester, "multiple_shards.json", s) + type args struct { + entity tsdb.Entity + } + type shardStruct struct { + id common.ShardID + location []string + elements []string + } + type want struct { + shards []shardStruct + } + + tests := []struct { + name string + args args + want want + wantErr bool + }{ + { + name: "all", + args: args{ + entity: tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry}, + }, + want: want{ + shards: []shardStruct{ + { + id: 0, + location: []string{"series_12243341348514563931", "data_flow_0"}, + elements: []string{"1"}, + }, + { + id: 0, + location: []string{"series_1671844747554927007", "data_flow_0"}, + elements: []string{"2"}, + }, + { + id: 1, + location: []string{"series_2374367181827824198", "data_flow_0"}, + elements: []string{"5", "3"}, + }, + { + id: 1, + location: []string{"series_8429137420168685297", "data_flow_0"}, + elements: []string{"4"}, + }, + }, + }, + }, + { + name: "find series by service_id and instance_id", + args: args{ + entity: tsdb.Entity{tsdb.Entry("webapp_id"), tsdb.Entry("10.0.0.1_id"), tsdb.AnyEntry}, + }, + want: want{ + shards: []shardStruct{ + { + id: 0, + location: []string{"series_12243341348514563931", "data_flow_0"}, + elements: []string{"1"}, + }, + { + id: 1, + location: []string{"series_2374367181827824198", "data_flow_0"}, + elements: []string{"5", "3"}, + }, + }, + }, + }, + { + name: "find a series", + args: args{ + entity: tsdb.Entity{tsdb.Entry("webapp_id"), tsdb.Entry("10.0.0.1_id"), convert.Uint64ToBytes(1)}, + }, + want: want{ + shards: []shardStruct{ + { + id: 1, + location: []string{"series_2374367181827824198", "data_flow_0"}, + elements: []string{"5", "3"}, + }, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + shards, err := s.Shards(tt.args.entity) + tester.NoError(err) + got := want{ + shards: []shardStruct{}, + } + + for _, shard := range shards { + seriesList, err := shard.Series().List(tsdb.NewPath(tt.args.entity)) + tester.NoError(err) + for _, series := range seriesList { + func(g *want) { + sp, err := series.Span(tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour)) + defer func(sp tsdb.SeriesSpan) { + _ = sp.Close() + }(sp) + tester.NoError(err) + seeker, err := sp.SeekerBuilder().Build() + tester.NoError(err) + iter, err := seeker.Seek() + tester.NoError(err) + for dataFlowID, iterator := range iter { + var elements []string + for iterator.Next() { + tagFamily, err := s.ParseTagFamily("searchable", iterator.Val()) + tester.NoError(err) + for _, tag := range tagFamily.GetTags() { + if tag.GetKey() == "trace_id" { + elements = append(elements, tag.GetValue().GetStr().GetValue()) + } + } + } + _ = iterator.Close() + g.shards = append(g.shards, shardStruct{ + id: shard.ID(), + location: []string{ + fmt.Sprintf("series_%v", series.ID()), + "data_flow_" + strconv.Itoa(dataFlowID), + }, + elements: elements, + }) + } + + }(&got) + } + } + if tt.wantErr { + tester.Error(err) + return + } + tester.NoError(err) + sort.SliceStable(got.shards, func(i, j int) bool { + a := got.shards[i] + b := got.shards[j] + if a.id > b.id { + return false + } + for i, al := range a.location { + bl := b.location[i] + if bytes.Compare([]byte(al), []byte(bl)) > 0 { + return false + } + } + return true + }) + tester.Equal(tt.want, got) + }) + } + +} + +//go:embed testdata/*.json +var dataFS embed.FS + +func setupQueryData(t *assert.Assertions, dataFile string, stream *stream) (baseTime time.Time) { + var templates []interface{} + baseTime = time.Now() + content, err := dataFS.ReadFile("testdata/" + dataFile) + t.NoError(err) + t.NoError(json.Unmarshal(content, &templates)) + bb, _ := base64.StdEncoding.DecodeString("YWJjMTIzIT8kKiYoKSctPUB+") + for i, template := range templates { + rawSearchTagFamily, err := json.Marshal(template) + t.NoError(err) + searchTagFamily := &streamv2.ElementValue_TagFamily{} + t.NoError(jsonpb.UnmarshalString(string(rawSearchTagFamily), searchTagFamily)) + e := &streamv2.ElementValue{ + ElementId: strconv.Itoa(i), + Timestamp: timestamppb.New(baseTime.Add(time.Millisecond * time.Duration(i))), + TagFamilies: []*streamv2.ElementValue_TagFamily{ + { + Tags: []*modelv2.TagValue{ + { + Value: &modelv2.TagValue_BinaryData{ + BinaryData: bb, + }, + }, + }, + }, + }, + } + e.TagFamilies = append(e.TagFamilies, searchTagFamily) + entity, err := stream.buildEntity(e) + t.NoError(err) + shardID, err := partition.ShardID(entity.Marshal(), stream.schema.GetShardNum()) + t.NoError(err) + itemID, err := stream.write(common.ShardID(shardID), e) + t.NoError(err) + sa, err := stream.Shards(entity) + for _, shard := range sa { + se, err := shard.Series().Get(entity) + t.NoError(err) + for { + item, closer, _ := se.Get(*itemID) + rawTagFamily, _ := item.Val("searchable") + if len(rawTagFamily) > 0 { + _ = closer.Close() + break + } + _ = closer.Close() + } + + } + } + return baseTime +} diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go index 2bcd0bf..fa80dc3 100644 --- a/banyand/stream/stream_write.go +++ b/banyand/stream/stream_write.go @@ -40,34 +40,34 @@ var ( ErrUnsupportedTagForIndexField = errors.New("the tag type(for example, null) can not be as the index field value") ) -func (s *stream) write(shardID common.ShardID, value *streamv2.ElementValue) error { +func (s *stream) write(shardID common.ShardID, value *streamv2.ElementValue) (*tsdb.GlobalItemID, error) { sm := s.schema fLen := len(value.GetTagFamilies()) if fLen < 1 { - return errors.Wrap(ErrMalformedElement, "no tag family") + return nil, errors.Wrap(ErrMalformedElement, "no tag family") } if fLen > len(sm.TagFamilies) { - return errors.Wrap(ErrMalformedElement, "tag family number is more than expected") + return nil, errors.Wrap(ErrMalformedElement, "tag family number is more than expected") } shard, err := s.db.Shard(shardID) if err != nil { - return err + return nil, err } entity, err := s.buildEntity(value) if err != nil { - return err + return nil, err } series, err := shard.Series().Get(entity) if err != nil { - return err + return nil, err } t := value.GetTimestamp().AsTime() - wp, err := series.Span(tsdb.NewTimeRange(t, 0)) + wp, err := series.Span(tsdb.NewTimeRangeDuration(t, 0)) if err != nil { if wp != nil { _ = wp.Close() } - return err + return nil, err } writeFn := func() (tsdb.Writer, error) { builder := wp.WriterBuilder().Time(t) @@ -97,12 +97,18 @@ func (s *stream) write(shardID common.ShardID, value *streamv2.ElementValue) err return nil, errWrite } _, errWrite = writer.Write() + s.l.Debug(). + Time("ts", t). + Int("ts_nano", t.Nanosecond()). + Interface("data", value). + Uint64("series_id", uint64(series.ID())). + Msg("write stream") return writer, errWrite } writer, err := writeFn() if err != nil { _ = wp.Close() - return err + return nil, err } m := indexMessage{ localWriter: writer, @@ -117,7 +123,8 @@ func (s *stream) write(shardID common.ShardID, value *streamv2.ElementValue) err }() s.indexCh <- m }(m) - return err + itemID := writer.ItemID() + return &itemID, err } func getIndexValue(ruleIndex indexRule, value *streamv2.ElementValue) (val []byte, err error) { @@ -204,7 +211,7 @@ func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) { } sm := writeEvent.WriteRequest.GetMetadata() id := formatStreamID(sm.GetName(), sm.GetGroup()) - err := w.schemaMap[id].write(common.ShardID(writeEvent.ShardID), writeEvent.WriteRequest.GetElement()) + _, err := w.schemaMap[id].write(common.ShardID(writeEvent.ShardID), writeEvent.WriteRequest.GetElement()) if err != nil { w.l.Debug().Err(err) } diff --git a/banyand/stream/stream_write_test.go b/banyand/stream/stream_write_test.go index 1853a7f..0ffe0d5 100644 --- a/banyand/stream/stream_write_test.go +++ b/banyand/stream/stream_write_test.go @@ -197,7 +197,7 @@ func Test_Stream_Write(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - err := s.write(common.ShardID(tt.args.shardID), tt.args.ele) + _, err := s.write(common.ShardID(tt.args.shardID), tt.args.ele) if tt.wantErr { tester.Error(err) return diff --git a/banyand/stream/testdata/multiple_shards.json b/banyand/stream/testdata/multiple_shards.json new file mode 100644 index 0000000..791c7c9 --- /dev/null +++ b/banyand/stream/testdata/multiple_shards.json @@ -0,0 +1,64 @@ +[ + { + "tags": [ + {"str":{"value": "1"}}, + {"int":{"value": 0}}, + {"str":{"value": "webapp_id"}}, + {"str":{"value": "10.0.0.1_id"}}, + {"str":{"value": "/home_id"}}, + {"int":{"value": 1000}}, + {"int":{"value": 1622933202000000000}} + ] + }, + { + "tags": [ + {"str":{"value": "2"}}, + {"int":{"value": 0}}, + {"str":{"value": "webapp_id"}}, + {"str":{"value": "10.0.0.3_id"}}, + {"str":{"value": "/home_id"}}, + {"int":{"value": 500}}, + {"int":{"value": 1622933202000000000}} + ] + }, + { + "tags": [ + {"str":{"value": "3"}}, + {"int":{"value": 1}}, + {"str":{"value": "webapp_id"}}, + {"str":{"value": "10.0.0.1_id"}}, + {"str":{"value": "/home_id"}}, + {"int":{"value": 30}}, + {"int":{"value": 1622933202000000000}}, + {"str":{"value": "GET"}}, + {"str":{"value": "500"}} + ] + }, + { + "tags": [ + {"str":{"value": "4"}}, + {"int":{"value": 1}}, + {"str":{"value": "webapp_id"}}, + {"str":{"value": "10.0.0.5_id"}}, + {"str":{"value": "/home_id"}}, + {"int":{"value": 60}}, + {"int":{"value": 1622933202000000000}}, + {"str":{"value": "GET"}}, + {"str":{"value": "400"}} + ] + }, + { + "tags": [ + {"str":{"value": "5"}}, + {"int":{"value": 1}}, + {"str":{"value": "webapp_id"}}, + {"str":{"value": "10.0.0.1_id"}}, + {"str":{"value": "/home_id"}}, + {"int":{"value": 300}}, + {"int":{"value": 1622933202000000000}}, + {"str":{"value": "GET"}}, + {"str":{"value": "500"}} + ] + } + +] \ No newline at end of file diff --git a/banyand/stream/testdata/shard0.json b/banyand/stream/testdata/shard0.json deleted file mode 100644 index 28de2f3..0000000 --- a/banyand/stream/testdata/shard0.json +++ /dev/null @@ -1,18 +0,0 @@ -[ - { - "element_id": "1", - "timestamp": "2021-04-15T01:30:15.01Z", - "tag_families": [ - { - "tags": [ - {"binary_data": "YWJjMTIzIT8kKiYoKSctPUB+"} - ] - }, - { - "tags": [ - {"str": ""} - ] - } - ] - } -] \ No newline at end of file diff --git a/banyand/stream/testdata/single_series.json b/banyand/stream/testdata/single_series.json new file mode 100644 index 0000000..049ec0c --- /dev/null +++ b/banyand/stream/testdata/single_series.json @@ -0,0 +1,51 @@ +[ + { + "tags": [ + {"str":{"value": "trace_id-xxfff.111323"}}, + {"int":{"value": 0}}, + {"str":{"value": "webapp_id"}}, + {"str":{"value": "10.0.0.1_id"}}, + {"str":{"value": "/home_id"}}, + {"int":{"value": 1000}}, + {"int":{"value": 1622933202000000000}} + ] + }, + { + "tags": [ + {"str":{"value": "trace_id-xxfff.111323"}}, + {"int":{"value": 0}}, + {"str":{"value": "webapp_id"}}, + {"str":{"value": "10.0.0.1_id"}}, + {"str":{"value": "/home_id"}}, + {"int":{"value": 500}}, + {"int":{"value": 1622933202000000000}} + ] + }, + { + "tags": [ + {"str":{"value": "trace_id-xxfff.111323"}}, + {"int":{"value": 0}}, + {"str":{"value": "webapp_id"}}, + {"str":{"value": "10.0.0.1_id"}}, + {"str":{"value": "/home_id"}}, + {"int":{"value": 30}}, + {"int":{"value": 1622933202000000000}}, + {"str":{"value": "GET"}}, + {"str":{"value": "200"}} + ] + }, + { + "tags": [ + {"str":{"value": "trace_id-xxfff.111323"}}, + {"int":{"value": 1}}, + {"str":{"value": "httpserver_id"}}, + {"str":{"value": "10.0.0.1_id"}}, + {"str":{"value": "/home_id"}}, + {"int":{"value": 300}}, + {"int":{"value": 1622933202000000000}}, + {"str":{"value": "GET"}}, + {"str":{"value": "500"}} + ] + } + +] \ No newline at end of file diff --git a/banyand/tsdb/series.go b/banyand/tsdb/series.go index efdf8ba..376e39c 100644 --- a/banyand/tsdb/series.go +++ b/banyand/tsdb/series.go @@ -19,6 +19,7 @@ package tsdb import ( "bytes" + "context" "io" "time" @@ -27,6 +28,7 @@ import ( "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/pkg/convert" + "github.com/apache/skywalking-banyandb/pkg/logger" ) var ( @@ -70,9 +72,8 @@ func (i *GlobalItemID) UnMarshal(data []byte) error { } type TimeRange struct { - Start time.Time - Duration time.Duration - End time.Time + Start time.Time + End time.Time } func (t TimeRange) contains(unixNano uint64) bool { @@ -83,18 +84,24 @@ func (t TimeRange) contains(unixNano uint64) bool { return tp.Equal(t.Start) || tp.After(t.Start) } -func NewTimeRange(Start time.Time, Duration time.Duration) TimeRange { +func NewTimeRange(Start, End time.Time) TimeRange { return TimeRange{ - Start: Start, - Duration: Duration, - End: Start.Add(Duration), + Start: Start, + End: End, + } +} + +func NewTimeRangeDuration(Start time.Time, Duration time.Duration) TimeRange { + return TimeRange{ + Start: Start, + End: Start.Add(Duration), } } type Series interface { ID() common.SeriesID Span(timeRange TimeRange) (SeriesSpan, error) - Get(id GlobalItemID) (Item, error) + Get(id GlobalItemID) (Item, io.Closer, error) } type SeriesSpan interface { @@ -109,18 +116,16 @@ type series struct { id common.SeriesID blockDB blockDatabase shardID common.ShardID + l *logger.Logger } -func (s *series) Get(id GlobalItemID) (Item, error) { - panic("implement me") -} - -func newSeries(id common.SeriesID, blockDB blockDatabase) *series { - return &series{ - id: id, - blockDB: blockDB, - shardID: blockDB.shardID(), - } +func (s *series) Get(id GlobalItemID) (Item, io.Closer, error) { + b := s.blockDB.block(id) + return &item{ + data: b.dataReader(), + itemID: id.id, + seriesID: s.id, + }, b, nil } func (s *series) ID() common.SeriesID { @@ -132,7 +137,25 @@ func (s *series) Span(timeRange TimeRange) (SeriesSpan, error) { if len(blocks) < 1 { return nil, ErrEmptySeriesSpan } - return newSeriesSpan(timeRange, blocks, s.id, s.shardID), nil + s.l.Debug(). + Times("time_range", []time.Time{timeRange.Start, timeRange.End}). + Msg("select series span") + return newSeriesSpan(context.WithValue(context.Background(), logger.ContextKey, s.l), timeRange, blocks, s.id, s.shardID), nil +} + +func newSeries(ctx context.Context, id common.SeriesID, blockDB blockDatabase) *series { + s := &series{ + id: id, + blockDB: blockDB, + shardID: blockDB.shardID(), + } + parentLogger := ctx.Value(logger.ContextKey) + if pl, ok := parentLogger.(*logger.Logger); ok { + s.l = pl.Named("series") + } else { + s.l = logger.GetLogger("series") + } + return s } var _ SeriesSpan = (*seriesSpan)(nil) @@ -142,6 +165,7 @@ type seriesSpan struct { seriesID common.SeriesID shardID common.ShardID timeRange TimeRange + l *logger.Logger } func (s *seriesSpan) Close() (err error) { @@ -159,12 +183,18 @@ func (s *seriesSpan) SeekerBuilder() SeekerBuilder { return newSeekerBuilder(s) } -func newSeriesSpan(timeRange TimeRange, blocks []blockDelegate, - id common.SeriesID, shardID common.ShardID) *seriesSpan { - return &seriesSpan{ +func newSeriesSpan(ctx context.Context, timeRange TimeRange, blocks []blockDelegate, id common.SeriesID, shardID common.ShardID) *seriesSpan { + s := &seriesSpan{ blocks: blocks, seriesID: id, shardID: shardID, timeRange: timeRange, } + parentLogger := ctx.Value(logger.ContextKey) + if pl, ok := parentLogger.(*logger.Logger); ok { + s.l = pl.Named("series_span") + } else { + s.l = logger.GetLogger("series_span") + } + return s } diff --git a/banyand/tsdb/series_seek.go b/banyand/tsdb/series_seek.go index 478efa0..1c4f338 100644 --- a/banyand/tsdb/series_seek.go +++ b/banyand/tsdb/series_seek.go @@ -63,16 +63,21 @@ type seekerBuilder struct { } func (s *seekerBuilder) Build() (Seeker, error) { + if s.order == modelv2.QueryOrder_SORT_UNSPECIFIED { + s.order = modelv2.QueryOrder_SORT_DESC + } indexFilter, err := s.buildIndexFilter() if err != nil { return nil, err } filters := []filterFn{ - indexFilter, func(item Item) bool { return s.seriesSpan.timeRange.contains(item.Time()) }, } + if indexFilter != nil { + filters = append(filters, indexFilter) + } return newSeeker(s.buildSeries(filters)), nil } diff --git a/banyand/tsdb/series_seek_filter.go b/banyand/tsdb/series_seek_filter.go index 65147ea..8814a16 100644 --- a/banyand/tsdb/series_seek_filter.go +++ b/banyand/tsdb/series_seek_filter.go @@ -44,6 +44,9 @@ func (s *seekerBuilder) Filter(indexRule *databasev2.IndexRule, condition Condit } func (s *seekerBuilder) buildIndexFilter() (filterFn, error) { + if len(s.conditions) < 1 { + return nil, nil + } var treeIndexCondition, invertedIndexCondition []index.Condition for _, condition := range s.conditions { if len(condition.condition) > 1 { diff --git a/banyand/tsdb/series_seek_sort.go b/banyand/tsdb/series_seek_sort.go index 1646bce..3b26a8c 100644 --- a/banyand/tsdb/series_seek_sort.go +++ b/banyand/tsdb/series_seek_sort.go @@ -19,6 +19,9 @@ package tsdb import ( "sort" + "time" + + "go.uber.org/multierr" "github.com/apache/skywalking-banyandb/api/common" databasev2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v2" @@ -26,6 +29,7 @@ import ( "github.com/apache/skywalking-banyandb/banyand/kv" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/index/posting" + "github.com/apache/skywalking-banyandb/pkg/logger" ) func (s *seekerBuilder) OrderByIndex(indexRule *databasev2.IndexRule, order modelv2.QueryOrder_Sort) SeekerBuilder { @@ -42,29 +46,20 @@ func (s *seekerBuilder) OrderByTime(order modelv2.QueryOrder_Sort) SeekerBuilder func (s *seekerBuilder) buildSeries(filters []filterFn) []Iterator { if s.indexRuleForSorting == nil { - return s.buildSeriesByIndex(filters) + return s.buildSeriesByTime(filters) } - return s.buildSeriesByTime(filters) + return s.buildSeriesByIndex(filters) } func (s *seekerBuilder) buildSeriesByIndex(filters []filterFn) (series []Iterator) { for _, b := range s.seriesSpan.blocks { switch s.indexRuleForSorting.GetType() { case databasev2.IndexRule_TYPE_TREE: - series = append(series, newSearcherIterator( - b.lsmIndexReader(). - FieldIterator([]byte(s.indexRuleForSorting.GetMetadata().GetName()), s.order), - b.dataReader(), - s.seriesSpan.seriesID, - filters, - )) + series = append(series, newSearcherIterator(s.seriesSpan.l, b.lsmIndexReader(). + FieldIterator([]byte(s.indexRuleForSorting.GetMetadata().GetName()), s.order), b.dataReader(), s.seriesSpan.seriesID, filters)) case databasev2.IndexRule_TYPE_INVERTED: - series = append(series, newSearcherIterator(b.invertedIndexReader(). - FieldIterator([]byte(s.indexRuleForSorting.GetMetadata().GetName()), s.order), - b.dataReader(), - s.seriesSpan.seriesID, - filters, - )) + series = append(series, newSearcherIterator(s.seriesSpan.l, b.invertedIndexReader(). + FieldIterator([]byte(s.indexRuleForSorting.GetMetadata().GetName()), s.order), b.dataReader(), s.seriesSpan.seriesID, filters)) } } return @@ -83,15 +78,23 @@ func (s *seekerBuilder) buildSeriesByTime(filters []filterFn) []Iterator { return bb[i].startTime().After(bb[j].startTime()) }) } - delegated := make([]Iterator, len(bb)) + delegated := make([]Iterator, 0, len(bb)) + var bTimes []time.Time for _, b := range bb { - delegated = append(delegated, newSearcherIterator(b. - primaryIndexReader(). - FieldIterator( - s.seriesSpan.seriesID.Marshal(), - s.order, - ), b.dataReader(), s.seriesSpan.seriesID, filters)) + bTimes = append(bTimes, b.startTime()) + delegated = append(delegated, newSearcherIterator( + s.seriesSpan.l, + b.primaryIndexReader(). + FieldIterator( + s.seriesSpan.seriesID.Marshal(), + s.order, + ), b.dataReader(), s.seriesSpan.seriesID, filters)) } + s.seriesSpan.l.Debug(). + Str("order", modelv2.QueryOrder_Sort_name[int32(s.order)]). + Times("blocks", bTimes). + Uint64("series_id", uint64(s.seriesSpan.seriesID)). + Msg("seek series by time") return []Iterator{newMergedIterator(delegated)} } @@ -104,6 +107,7 @@ type searcherIterator struct { data kv.TimeSeriesReader seriesID common.SeriesID filters []filterFn + l *logger.Logger } func (s *searcherIterator) Next() bool { @@ -112,20 +116,22 @@ func (s *searcherIterator) Next() bool { v := s.fieldIterator.Val() s.cur = v.Value.Iterator() s.curKey = v.Key + s.l.Trace().Hex("term_field", s.curKey).Msg("got a new field") } else { - _ = s.Close() return false } } if s.cur.Next() { + for _, filter := range s.filters { if !filter(s.Val()) { + s.l.Trace().Uint64("item_id", uint64(s.Val().ID())).Msg("ignore the item") return s.Next() } } + s.l.Trace().Uint64("item_id", uint64(s.Val().ID())).Msg("got an item") return true } - _ = s.cur.Close() s.cur = nil return s.Next() } @@ -143,12 +149,14 @@ func (s *searcherIterator) Close() error { return s.fieldIterator.Close() } -func newSearcherIterator(fieldIterator index.FieldIterator, data kv.TimeSeriesReader, seriesID common.SeriesID, filters []filterFn) Iterator { +func newSearcherIterator(l *logger.Logger, fieldIterator index.FieldIterator, data kv.TimeSeriesReader, + seriesID common.SeriesID, filters []filterFn) Iterator { return &searcherIterator{ fieldIterator: fieldIterator, data: data, seriesID: seriesID, filters: filters, + l: l, } } @@ -158,17 +166,12 @@ type mergedIterator struct { curr Iterator index int delegated []Iterator - closed bool } func (m *mergedIterator) Next() bool { - if m.closed { - return false - } if m.curr == nil { m.index++ if m.index >= len(m.delegated) { - _ = m.Close() return false } else { m.curr = m.delegated[m.index] @@ -176,7 +179,6 @@ func (m *mergedIterator) Next() bool { } hasNext := m.curr.Next() if !hasNext { - _ = m.curr.Close() m.curr = nil return m.Next() } @@ -188,8 +190,11 @@ func (m *mergedIterator) Val() Item { } func (m *mergedIterator) Close() error { - m.closed = true - return nil + var err error + for _, d := range m.delegated { + err = multierr.Append(err, d.Close()) + } + return err } func newMergedIterator(delegated []Iterator) Iterator { diff --git a/banyand/tsdb/series_write.go b/banyand/tsdb/series_write.go index 0f69332..af59b53 100644 --- a/banyand/tsdb/series_write.go +++ b/banyand/tsdb/series_write.go @@ -159,9 +159,10 @@ func (d dataBucket) marshal() []byte { }, nil) } -func (w *writer) Write() (id GlobalItemID, err error) { +func (w *writer) Write() (GlobalItemID, error) { + id := w.ItemID() for _, c := range w.columns { - err = w.block.write(dataBucket{ + err := w.block.write(dataBucket{ seriesID: w.itemID.seriesID, family: c.family, }.marshal(), diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go index 24a622f..ccae717 100644 --- a/banyand/tsdb/seriesdb.go +++ b/banyand/tsdb/seriesdb.go @@ -41,6 +41,14 @@ type Entry []byte type Entity []Entry +func (e Entity) Marshal() []byte { + data := make([][]byte, len(e)) + for i, entry := range e { + data[i] = entry + } + return bytes.Join(data, nil) +} + type Path struct { prefix []byte mask []byte @@ -86,6 +94,7 @@ type SeriesDatabase interface { type blockDatabase interface { shardID() common.ShardID span(timeRange TimeRange) []blockDelegate + block(id GlobalItemID) blockDelegate } var _ SeriesDatabase = (*seriesDB)(nil) @@ -100,6 +109,10 @@ type seriesDB struct { sID common.ShardID } +func (s *seriesDB) block(id GlobalItemID) blockDelegate { + return s.lst[id.segID].lst[id.blockID].delegate() +} + func (s *seriesDB) shardID() common.ShardID { return s.sID } @@ -111,7 +124,7 @@ func (s *seriesDB) Get(entity Entity) (Series, error) { return nil, err } if err == nil { - return newSeries(bytesConvSeriesID(seriesID), s), nil + return newSeries(s.context(), bytesConvSeriesID(seriesID), s), nil } s.Lock() defer s.Unlock() @@ -120,7 +133,7 @@ func (s *seriesDB) Get(entity Entity) (Series, error) { if err != nil { return nil, err } - return newSeries(bytesConvSeriesID(seriesID), s), nil + return newSeries(s.context(), bytesConvSeriesID(seriesID), s), nil } func (s *seriesDB) List(path Path) (SeriesList, error) { @@ -130,8 +143,14 @@ func (s *seriesDB) List(path Path) (SeriesList, error) { return nil, err } if err == nil { - return []Series{newSeries(bytesConvSeriesID(id), s)}, nil + seriesID := bytesConvSeriesID(id) + s.l.Debug(). + Hex("path", path.prefix). + Uint64("series_id", uint64(seriesID)). + Msg("got a series") + return []Series{newSeries(s.context(), seriesID, s)}, nil } + s.l.Debug().Hex("path", path.prefix).Msg("doesn't get any series") return nil, nil } result := make([]Series, 0) @@ -147,7 +166,12 @@ func (s *seriesDB) List(path Path) (SeriesList, error) { err = multierr.Append(err, errGetVal) return nil } - result = append(result, newSeries(common.SeriesID(convert.BytesToUint64(id)), s)) + seriesID := bytesConvSeriesID(id) + s.l.Debug(). + Hex("path", path.prefix). + Uint64("series_id", uint64(seriesID)). + Msg("got a series") + result = append(result, newSeries(s.context(), seriesID, s)) } return nil }) @@ -166,6 +190,10 @@ func (s *seriesDB) span(_ TimeRange) []blockDelegate { return result } +func (s *seriesDB) context() context.Context { + return context.WithValue(context.Background(), logger.ContextKey, s.l) +} + func (s *seriesDB) Close() error { for _, seg := range s.lst { seg.close() @@ -183,7 +211,7 @@ func newSeriesDataBase(ctx context.Context, shardID common.ShardID, path string, return nil, logger.ErrNoLoggerInContext } if pl, ok := parentLogger.(*logger.Logger); ok { - sdb.l = pl.Named("seriesSpan") + sdb.l = pl.Named("series_database") } var err error sdb.seriesMetadata, err = kv.OpenStore(0, path+"/md", kv.StoreWithNamedLogger("metadata", sdb.l)) diff --git a/banyand/tsdb/seriesdb_test.go b/banyand/tsdb/seriesdb_test.go index 3968564..a87d24c 100644 --- a/banyand/tsdb/seriesdb_test.go +++ b/banyand/tsdb/seriesdb_test.go @@ -338,7 +338,7 @@ func setUpEntities(t *assert.Assertions, db SeriesDatabase) []*entityWithID { } func newMockSeries(id common.SeriesID) *series { - return newSeries(id, nil) + return newSeries(nil, id, nil) } func transform(list SeriesList) (seriesIDs []common.SeriesID) { diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go index 3b2ed4e..4bcc40e 100644 --- a/banyand/tsdb/shard.go +++ b/banyand/tsdb/shard.go @@ -37,6 +37,10 @@ type shard struct { lst []*segment } +func (s *shard) ID() common.ShardID { + return s.id +} + func (s *shard) Series() SeriesDatabase { return s.seriesDatabase } diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go index 0d3da45..4528b46 100644 --- a/banyand/tsdb/tsdb.go +++ b/banyand/tsdb/tsdb.go @@ -60,6 +60,7 @@ type Database interface { type Shard interface { io.Closer + ID() common.ShardID Series() SeriesDatabase Index() IndexDatabase }
