This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch time-series in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit e8df5deb25e057b4233a63f93736dd5b826d9902 Author: Gao Hongtao <[email protected]> AuthorDate: Tue Sep 7 17:51:01 2021 +0800 Finish test Signed-off-by: Gao Hongtao <[email protected]> --- banyand/kv/badger.go | 3 + banyand/stream/stream_query.go | 5 + banyand/stream/stream_query_test.go | 451 ++++++++++++++++----- .../{single_series.json => global_index.json} | 33 +- banyand/stream/testdata/multiple_shards.json | 6 +- banyand/tsdb/block.go | 14 +- banyand/tsdb/indexdb.go | 6 +- banyand/tsdb/series.go | 22 +- banyand/tsdb/series_seek.go | 9 +- banyand/tsdb/series_seek_filter.go | 6 +- banyand/tsdb/series_seek_sort.go | 40 +- banyand/tsdb/series_write.go | 20 +- banyand/tsdb/seriesdb.go | 5 + banyand/tsdb/seriesdb_test.go | 2 +- pkg/index/index.go | 4 - pkg/index/inverted/field_map.go | 23 +- pkg/index/inverted/inverted.go | 5 - pkg/index/inverted/mem.go | 14 +- pkg/index/inverted/mem_test.go | 129 ++---- pkg/index/search.go | 4 +- 20 files changed, 505 insertions(+), 296 deletions(-) diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go index 131032d..a3767d5 100644 --- a/banyand/kv/badger.go +++ b/banyand/kv/badger.go @@ -255,6 +255,9 @@ func (b *badgerDB) GetAll(key []byte, applyFn func([]byte) error) error { iter := b.db.NewIterator(badger.DefaultIteratorOptions) var count int for iter.Seek(key); iter.Valid(); iter.Next() { + if !bytes.Equal(y.ParseKey(iter.Key()), key) { + break + } count++ err := applyFn(y.Copy(iter.Value().Value)) if err != nil { diff --git a/banyand/stream/stream_query.go b/banyand/stream/stream_query.go index 3402665..5bd9844 100644 --- a/banyand/stream/stream_query.go +++ b/banyand/stream/stream_query.go @@ -41,6 +41,7 @@ type Query interface { type Stream interface { Shards(entity tsdb.Entity) ([]tsdb.Shard, error) + Shard(id common.ShardID) (tsdb.Shard, error) } var _ Stream = (*stream)(nil) @@ -65,6 +66,10 @@ func (s *stream) Shards(entity tsdb.Entity) ([]tsdb.Shard, error) { return []tsdb.Shard{shard}, nil } +func (s *stream) Shard(id common.ShardID) (tsdb.Shard, error) { + return s.db.Shard(id) +} + func (s *stream) ParseTagFamily(family string, item tsdb.Item) (*modelv2.TagFamily, error) { familyRawBytes, err := item.Val(family) if err != nil { diff --git a/banyand/stream/stream_query_test.go b/banyand/stream/stream_query_test.go index 8b6cc12..65311ce 100644 --- a/banyand/stream/stream_query_test.go +++ b/banyand/stream/stream_query_test.go @@ -24,23 +24,36 @@ import ( "encoding/base64" "encoding/json" "fmt" + "io" "sort" "strconv" "testing" "time" "github.com/golang/protobuf/jsonpb" + "github.com/pkg/errors" "github.com/stretchr/testify/assert" "google.golang.org/protobuf/types/known/timestamppb" "github.com/apache/skywalking-banyandb/api/common" + commonv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v2" + databasev2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v2" modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2" streamv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v2" "github.com/apache/skywalking-banyandb/banyand/tsdb" "github.com/apache/skywalking-banyandb/pkg/convert" + "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/partition" ) +type shardStruct struct { + id common.ShardID + location []string + elements []string +} + +type shardsForTest []shardStruct + func Test_Stream_SelectShard(t *testing.T) { tester := assert.New(t) s, deferFunc := setup(tester) @@ -87,86 +100,184 @@ func Test_Stream_Series(t *testing.T) { s, deferFunc := setup(tester) defer deferFunc() baseTime := setupQueryData(tester, "multiple_shards.json", s) - type args struct { - entity tsdb.Entity - } - type shardStruct struct { - id common.ShardID - location []string - elements []string - } - type want struct { - shards []shardStruct - } - tests := []struct { name string - args args - want want + args queryOpts + want shardsForTest wantErr bool }{ { name: "all", - args: args{ - entity: tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry}, + args: queryOpts{ + entity: tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry}, + timeRange: tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour), }, - want: want{ - shards: []shardStruct{ - { - id: 0, - location: []string{"series_12243341348514563931", "data_flow_0"}, - elements: []string{"1"}, - }, - { - id: 0, - location: []string{"series_1671844747554927007", "data_flow_0"}, - elements: []string{"2"}, - }, - { - id: 1, - location: []string{"series_2374367181827824198", "data_flow_0"}, - elements: []string{"5", "3"}, - }, - { - id: 1, - location: []string{"series_8429137420168685297", "data_flow_0"}, - elements: []string{"4"}, - }, + want: shardsForTest{ + { + id: 0, + location: []string{"series_12243341348514563931", "data_flow_0"}, + elements: []string{"1"}, + }, + { + id: 0, + location: []string{"series_1671844747554927007", "data_flow_0"}, + elements: []string{"2"}, + }, + { + id: 1, + location: []string{"series_2374367181827824198", "data_flow_0"}, + elements: []string{"5", "3"}, + }, + { + id: 1, + location: []string{"series_8429137420168685297", "data_flow_0"}, + elements: []string{"4"}, + }, + }, + }, + + { + name: "time range", + args: queryOpts{ + entity: tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry}, + timeRange: tsdb.NewTimeRangeDuration(baseTime.Add(1500*time.Millisecond), 1*time.Hour), + }, + want: shardsForTest{ + { + id: 0, + location: []string{"series_12243341348514563931", "data_flow_0"}, + }, + { + id: 0, + location: []string{"series_1671844747554927007", "data_flow_0"}, + }, + { + id: 1, + location: []string{"series_2374367181827824198", "data_flow_0"}, + elements: []string{"5"}, + }, + { + id: 1, + location: []string{"series_8429137420168685297", "data_flow_0"}, + elements: []string{"4"}, }, }, }, { name: "find series by service_id and instance_id", - args: args{ - entity: tsdb.Entity{tsdb.Entry("webapp_id"), tsdb.Entry("10.0.0.1_id"), tsdb.AnyEntry}, + args: queryOpts{ + entity: tsdb.Entity{tsdb.Entry("webapp_id"), tsdb.Entry("10.0.0.1_id"), tsdb.AnyEntry}, + timeRange: tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour), }, - want: want{ - shards: []shardStruct{ - { - id: 0, - location: []string{"series_12243341348514563931", "data_flow_0"}, - elements: []string{"1"}, - }, - { - id: 1, - location: []string{"series_2374367181827824198", "data_flow_0"}, - elements: []string{"5", "3"}, - }, + want: shardsForTest{ + { + id: 0, + location: []string{"series_12243341348514563931", "data_flow_0"}, + elements: []string{"1"}, + }, + { + id: 1, + location: []string{"series_2374367181827824198", "data_flow_0"}, + elements: []string{"5", "3"}, }, }, }, { name: "find a series", - args: args{ - entity: tsdb.Entity{tsdb.Entry("webapp_id"), tsdb.Entry("10.0.0.1_id"), convert.Uint64ToBytes(1)}, + args: queryOpts{ + entity: tsdb.Entity{tsdb.Entry("webapp_id"), tsdb.Entry("10.0.0.1_id"), convert.Uint64ToBytes(1)}, + timeRange: tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour), }, - want: want{ - shards: []shardStruct{ - { - id: 1, - location: []string{"series_2374367181827824198", "data_flow_0"}, - elements: []string{"5", "3"}, - }, + want: shardsForTest{ + { + id: 1, + location: []string{"series_2374367181827824198", "data_flow_0"}, + elements: []string{"5", "3"}, + }, + }, + }, + { + name: "filter", + args: queryOpts{ + entity: tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry}, + timeRange: tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour), + buildFn: func(builder tsdb.SeekerBuilder) { + builder.Filter(&databasev2.IndexRule{ + Metadata: &commonv2.Metadata{ + Name: "endpoint_id", + Group: "default", + }, + Tags: []string{"endpoint_id"}, + Type: databasev2.IndexRule_TYPE_INVERTED, + Location: databasev2.IndexRule_LOCATION_SERIES, + }, tsdb.Condition{ + "endpoint_id": []index.ConditionValue{ + { + Op: modelv2.Condition_BINARY_OP_EQ, + Values: [][]byte{[]byte("/home_id")}, + }, + }, + }) + }, + }, + want: shardsForTest{ + { + id: 0, + location: []string{"series_12243341348514563931", "data_flow_0"}, + elements: []string{"1"}, + }, + { + id: 0, + location: []string{"series_1671844747554927007", "data_flow_0"}, + }, + { + id: 1, + location: []string{"series_2374367181827824198", "data_flow_0"}, + elements: []string{"3"}, + }, + { + id: 1, + location: []string{"series_8429137420168685297", "data_flow_0"}, + }, + }, + }, + { + name: "order by duration", + args: queryOpts{ + entity: tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry}, + timeRange: tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour), + buildFn: func(builder tsdb.SeekerBuilder) { + builder.OrderByIndex(&databasev2.IndexRule{ + Metadata: &commonv2.Metadata{ + Name: "duration", + Group: "default", + }, + Tags: []string{"duration"}, + Type: databasev2.IndexRule_TYPE_TREE, + Location: databasev2.IndexRule_LOCATION_SERIES, + }, modelv2.QueryOrder_SORT_ASC) + }, + }, + want: shardsForTest{ + { + id: 0, + location: []string{"series_12243341348514563931", "data_flow_0"}, + elements: []string{"1"}, + }, + { + id: 0, + location: []string{"series_1671844747554927007", "data_flow_0"}, + elements: []string{"2"}, + }, + { + id: 1, + location: []string{"series_2374367181827824198", "data_flow_0"}, + elements: []string{"3", "5"}, + }, + { + id: 1, + location: []string{"series_8429137420168685297", "data_flow_0"}, + elements: []string{"4"}, }, }, }, @@ -174,59 +285,15 @@ func Test_Stream_Series(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - shards, err := s.Shards(tt.args.entity) - tester.NoError(err) - got := want{ - shards: []shardStruct{}, - } - - for _, shard := range shards { - seriesList, err := shard.Series().List(tsdb.NewPath(tt.args.entity)) - tester.NoError(err) - for _, series := range seriesList { - func(g *want) { - sp, err := series.Span(tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour)) - defer func(sp tsdb.SeriesSpan) { - _ = sp.Close() - }(sp) - tester.NoError(err) - seeker, err := sp.SeekerBuilder().Build() - tester.NoError(err) - iter, err := seeker.Seek() - tester.NoError(err) - for dataFlowID, iterator := range iter { - var elements []string - for iterator.Next() { - tagFamily, err := s.ParseTagFamily("searchable", iterator.Val()) - tester.NoError(err) - for _, tag := range tagFamily.GetTags() { - if tag.GetKey() == "trace_id" { - elements = append(elements, tag.GetValue().GetStr().GetValue()) - } - } - } - _ = iterator.Close() - g.shards = append(g.shards, shardStruct{ - id: shard.ID(), - location: []string{ - fmt.Sprintf("series_%v", series.ID()), - "data_flow_" + strconv.Itoa(dataFlowID), - }, - elements: elements, - }) - } - - }(&got) - } - } + got, err := queryData(tester, s, tt.args) if tt.wantErr { tester.Error(err) return } tester.NoError(err) - sort.SliceStable(got.shards, func(i, j int) bool { - a := got.shards[i] - b := got.shards[j] + sort.SliceStable(got, func(i, j int) bool { + a := got[i] + b := got[j] if a.id > b.id { return false } @@ -244,6 +311,169 @@ func Test_Stream_Series(t *testing.T) { } +func Test_Stream_Global_Index(t *testing.T) { + tester := assert.New(t) + s, deferFunc := setup(tester) + defer deferFunc() + _ = setupQueryData(tester, "global_index.json", s) + tests := []struct { + name string + traceID string + wantTraceSegmentNum int + wantErr bool + }{ + { + name: "trace id is 1", + traceID: "1", + wantTraceSegmentNum: 2, + }, + { + name: "trace id is 2", + traceID: "2", + wantTraceSegmentNum: 3, + }, + { + name: "unknown trace id", + traceID: "foo", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + shards, errShards := s.Shards(nil) + tester.NoError(errShards) + err := func() error { + for _, shard := range shards { + itemIDs, err := shard.Index().Seek(index.Field{ + Term: []byte("trace_id"), + Value: []byte(tt.traceID), + }) + if err != nil { + return errors.WithStack(err) + } + if len(itemIDs) < 1 { + continue + } + if err != nil { + return errors.WithStack(err) + } + tester.Equal(tt.wantTraceSegmentNum, len(itemIDs)) + for _, itemID := range itemIDs { + segShard, err := s.Shard(itemID.ShardID) + if err != nil { + return errors.WithStack(err) + } + series, err := segShard.Series().GetByID(itemID.SeriesID) + if err != nil { + return errors.WithStack(err) + } + err = func() error { + item, closer, errInner := series.Get(itemID) + defer func(closer io.Closer) { + _ = closer.Close() + }(closer) + if errInner != nil { + return errors.WithStack(errInner) + } + tagFamily, errInner := s.ParseTagFamily("searchable", item) + if errInner != nil { + return errors.WithStack(errInner) + } + for _, tag := range tagFamily.GetTags() { + if tag.GetKey() == "trace_id" { + tester.Equal(tt.traceID, tag.GetValue().GetStr().GetValue()) + } + } + return nil + }() + if err != nil { + return errors.WithStack(err) + } + + } + } + return nil + }() + if tt.wantErr { + tester.Error(err) + return + } + tester.NoError(err) + }) + } + +} + +type queryOpts struct { + entity tsdb.Entity + timeRange tsdb.TimeRange + buildFn func(builder tsdb.SeekerBuilder) +} + +func queryData(tester *assert.Assertions, s *stream, opts queryOpts) (shardsForTest, error) { + shards, err := s.Shards(opts.entity) + tester.NoError(err) + got := shardsForTest{} + for _, shard := range shards { + seriesList, err := shard.Series().List(tsdb.NewPath(opts.entity)) + if err != nil { + return nil, err + } + for _, series := range seriesList { + got, err = func(g shardsForTest) (shardsForTest, error) { + sp, errInner := series.Span(opts.timeRange) + defer func(sp tsdb.SeriesSpan) { + _ = sp.Close() + }(sp) + if errInner != nil { + return nil, errInner + } + builder := sp.SeekerBuilder() + if opts.buildFn != nil { + opts.buildFn(builder) + } + seeker, errInner := builder.Build() + if errInner != nil { + return nil, errInner + } + iter, errInner := seeker.Seek() + if errInner != nil { + return nil, errInner + } + for dataFlowID, iterator := range iter { + var elements []string + for iterator.Next() { + tagFamily, errInner := s.ParseTagFamily("searchable", iterator.Val()) + if errInner != nil { + return nil, errInner + } + for _, tag := range tagFamily.GetTags() { + if tag.GetKey() == "trace_id" { + elements = append(elements, tag.GetValue().GetStr().GetValue()) + } + } + } + _ = iterator.Close() + g = append(g, shardStruct{ + id: shard.ID(), + location: []string{ + fmt.Sprintf("series_%v", series.ID()), + "data_flow_" + strconv.Itoa(dataFlowID), + }, + elements: elements, + }) + } + + return g, nil + }(got) + if err != nil { + return nil, err + } + } + } + return got, nil +} + //go:embed testdata/*.json var dataFS embed.FS @@ -261,7 +491,7 @@ func setupQueryData(t *assert.Assertions, dataFile string, stream *stream) (base t.NoError(jsonpb.UnmarshalString(string(rawSearchTagFamily), searchTagFamily)) e := &streamv2.ElementValue{ ElementId: strconv.Itoa(i), - Timestamp: timestamppb.New(baseTime.Add(time.Millisecond * time.Duration(i))), + Timestamp: timestamppb.New(baseTime.Add(500 * time.Millisecond * time.Duration(i))), TagFamilies: []*streamv2.ElementValue_TagFamily{ { Tags: []*modelv2.TagValue{ @@ -282,6 +512,7 @@ func setupQueryData(t *assert.Assertions, dataFile string, stream *stream) (base itemID, err := stream.write(common.ShardID(shardID), e) t.NoError(err) sa, err := stream.Shards(entity) + t.NoError(err) for _, shard := range sa { se, err := shard.Series().Get(entity) t.NoError(err) diff --git a/banyand/stream/testdata/single_series.json b/banyand/stream/testdata/global_index.json similarity index 57% rename from banyand/stream/testdata/single_series.json rename to banyand/stream/testdata/global_index.json index 049ec0c..9e81928 100644 --- a/banyand/stream/testdata/single_series.json +++ b/banyand/stream/testdata/global_index.json @@ -1,7 +1,7 @@ [ { "tags": [ - {"str":{"value": "trace_id-xxfff.111323"}}, + {"str":{"value": "1"}}, {"int":{"value": 0}}, {"str":{"value": "webapp_id"}}, {"str":{"value": "10.0.0.1_id"}}, @@ -12,35 +12,48 @@ }, { "tags": [ - {"str":{"value": "trace_id-xxfff.111323"}}, + {"str":{"value": "2"}}, {"int":{"value": 0}}, {"str":{"value": "webapp_id"}}, - {"str":{"value": "10.0.0.1_id"}}, - {"str":{"value": "/home_id"}}, + {"str":{"value": "10.0.0.3_id"}}, + {"str":{"value": "/product_id"}}, {"int":{"value": 500}}, {"int":{"value": 1622933202000000000}} ] }, { "tags": [ - {"str":{"value": "trace_id-xxfff.111323"}}, - {"int":{"value": 0}}, + {"str":{"value": "1"}}, + {"int":{"value": 1}}, {"str":{"value": "webapp_id"}}, {"str":{"value": "10.0.0.1_id"}}, {"str":{"value": "/home_id"}}, {"int":{"value": 30}}, {"int":{"value": 1622933202000000000}}, {"str":{"value": "GET"}}, - {"str":{"value": "200"}} + {"str":{"value": "500"}} ] }, { "tags": [ - {"str":{"value": "trace_id-xxfff.111323"}}, + {"str":{"value": "2"}}, {"int":{"value": 1}}, - {"str":{"value": "httpserver_id"}}, + {"str":{"value": "webapp_id"}}, + {"str":{"value": "10.0.0.5_id"}}, + {"str":{"value": "/price_id"}}, + {"int":{"value": 60}}, + {"int":{"value": 1622933202000000000}}, + {"str":{"value": "GET"}}, + {"str":{"value": "400"}} + ] + }, + { + "tags": [ + {"str":{"value": "2"}}, + {"int":{"value": 1}}, + {"str":{"value": "webapp_id"}}, {"str":{"value": "10.0.0.1_id"}}, - {"str":{"value": "/home_id"}}, + {"str":{"value": "/item_id"}}, {"int":{"value": 300}}, {"int":{"value": 1622933202000000000}}, {"str":{"value": "GET"}}, diff --git a/banyand/stream/testdata/multiple_shards.json b/banyand/stream/testdata/multiple_shards.json index 791c7c9..3f1a721 100644 --- a/banyand/stream/testdata/multiple_shards.json +++ b/banyand/stream/testdata/multiple_shards.json @@ -16,7 +16,7 @@ {"int":{"value": 0}}, {"str":{"value": "webapp_id"}}, {"str":{"value": "10.0.0.3_id"}}, - {"str":{"value": "/home_id"}}, + {"str":{"value": "/product_id"}}, {"int":{"value": 500}}, {"int":{"value": 1622933202000000000}} ] @@ -40,7 +40,7 @@ {"int":{"value": 1}}, {"str":{"value": "webapp_id"}}, {"str":{"value": "10.0.0.5_id"}}, - {"str":{"value": "/home_id"}}, + {"str":{"value": "/price_id"}}, {"int":{"value": 60}}, {"int":{"value": 1622933202000000000}}, {"str":{"value": "GET"}}, @@ -53,7 +53,7 @@ {"int":{"value": 1}}, {"str":{"value": "webapp_id"}}, {"str":{"value": "10.0.0.1_id"}}, - {"str":{"value": "/home_id"}}, + {"str":{"value": "/item_id"}}, {"int":{"value": 300}}, {"int":{"value": 1622933202000000000}}, {"str":{"value": "GET"}}, diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go index 7d5168f..44f1922 100644 --- a/banyand/tsdb/block.go +++ b/banyand/tsdb/block.go @@ -81,22 +81,10 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) { } b.closableLst = append(b.closableLst, b.store, b.primaryIndex) rules, ok := ctx.Value(indexRulesKey).([]*databasev2.IndexRule) - var specs []index.FieldSpec - for _, rule := range rules { - if rule.GetLocation() == databasev2.IndexRule_LOCATION_SERIES { - specs = append(specs, index.FieldSpec{ - Name: rule.GetMetadata().GetName(), - }) - } - } - if !ok || len(specs) == 0 { + if !ok || len(rules) == 0 { return b, nil } b.invertedIndex = inverted.NewStore("inverted") - err = b.invertedIndex.Initialize(specs) - if err != nil { - return nil, err - } return b, nil } diff --git a/banyand/tsdb/indexdb.go b/banyand/tsdb/indexdb.go index c81c727..c9fd9d0 100644 --- a/banyand/tsdb/indexdb.go +++ b/banyand/tsdb/indexdb.go @@ -24,6 +24,7 @@ import ( "github.com/pkg/errors" "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/banyand/kv" "github.com/apache/skywalking-banyandb/pkg/index" ) @@ -54,7 +55,7 @@ type indexDB struct { } func (i *indexDB) Seek(term index.Field) ([]GlobalItemID, error) { - var result []GlobalItemID + result := make([]GlobalItemID, 0) err := i.lst[0].globalIndex.GetAll(term.Marshal(), func(rawBytes []byte) error { id := &GlobalItemID{} err := id.UnMarshal(rawBytes) @@ -64,6 +65,9 @@ func (i *indexDB) Seek(term index.Field) ([]GlobalItemID, error) { result = append(result, *id) return nil }) + if err == kv.ErrKeyNotFound { + return result, nil + } return result, err } diff --git a/banyand/tsdb/series.go b/banyand/tsdb/series.go index 376e39c..c9ec07b 100644 --- a/banyand/tsdb/series.go +++ b/banyand/tsdb/series.go @@ -37,37 +37,37 @@ var ( ) type GlobalItemID struct { - shardID common.ShardID + ShardID common.ShardID segID uint16 blockID uint16 - seriesID common.SeriesID - id common.ItemID + SeriesID common.SeriesID + ID common.ItemID } func (i *GlobalItemID) Marshal() []byte { return bytes.Join([][]byte{ - convert.Uint32ToBytes(uint32(i.shardID)), + convert.Uint32ToBytes(uint32(i.ShardID)), convert.Uint16ToBytes(i.segID), convert.Uint16ToBytes(i.blockID), - convert.Uint64ToBytes(uint64(i.seriesID)), - convert.Uint64ToBytes(uint64(i.id)), + convert.Uint64ToBytes(uint64(i.SeriesID)), + convert.Uint64ToBytes(uint64(i.ID)), }, nil) } func (i *GlobalItemID) UnMarshal(data []byte) error { - if len(data) <= 32+16+16+64+64 { + if len(data) != 4+2+2+8+8 { return ErrItemIDMalformed } var offset int - i.shardID = common.ShardID(convert.BytesToUint32(data[offset : offset+4])) + i.ShardID = common.ShardID(convert.BytesToUint32(data[offset : offset+4])) offset += 4 i.segID = convert.BytesToUint16(data[offset : offset+2]) offset += 2 i.blockID = convert.BytesToUint16(data[offset : offset+2]) offset += 2 - i.seriesID = common.SeriesID(convert.BytesToUint64(data[offset : offset+8])) + i.SeriesID = common.SeriesID(convert.BytesToUint64(data[offset : offset+8])) offset += 8 - i.id = common.ItemID(convert.BytesToUint64(data[offset:])) + i.ID = common.ItemID(convert.BytesToUint64(data[offset:])) return nil } @@ -123,7 +123,7 @@ func (s *series) Get(id GlobalItemID) (Item, io.Closer, error) { b := s.blockDB.block(id) return &item{ data: b.dataReader(), - itemID: id.id, + itemID: id.ID, seriesID: s.id, }, b, nil } diff --git a/banyand/tsdb/series_seek.go b/banyand/tsdb/series_seek.go index 1c4f338..a6cf6c0 100644 --- a/banyand/tsdb/series_seek.go +++ b/banyand/tsdb/series_seek.go @@ -18,6 +18,8 @@ package tsdb import ( + "time" + "github.com/apache/skywalking-banyandb/api/common" databasev2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v2" modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2" @@ -72,7 +74,12 @@ func (s *seekerBuilder) Build() (Seeker, error) { } filters := []filterFn{ func(item Item) bool { - return s.seriesSpan.timeRange.contains(item.Time()) + valid := s.seriesSpan.timeRange.contains(item.Time()) + timeRange := s.seriesSpan.timeRange + s.seriesSpan.l.Trace(). + Times("time_range", []time.Time{timeRange.Start, timeRange.End}). + Bool("valid", valid).Msg("filter item by time range") + return valid }, } if indexFilter != nil { diff --git a/banyand/tsdb/series_seek_filter.go b/banyand/tsdb/series_seek_filter.go index 8814a16..990d1ae 100644 --- a/banyand/tsdb/series_seek_filter.go +++ b/banyand/tsdb/series_seek_filter.go @@ -53,7 +53,7 @@ func (s *seekerBuilder) buildIndexFilter() (filterFn, error) { //TODO:// should support composite index rule return nil, ErrUnsupportedIndexRule } - var cond index.Condition + cond := make(index.Condition) term := index.Term{ SeriesID: s.seriesSpan.seriesID, IndexRule: condition.indexRule, @@ -100,7 +100,9 @@ func (s *seekerBuilder) buildIndexFilter() (filterFn, error) { } } return func(item Item) bool { - return allItemIDs.Contains(item.ID()) + valid := allItemIDs.Contains(item.ID()) + s.seriesSpan.l.Trace().Int("valid_item_num", allItemIDs.Len()).Bool("valid", valid).Msg("filter item by index") + return valid }, nil } diff --git a/banyand/tsdb/series_seek_sort.go b/banyand/tsdb/series_seek_sort.go index 3b26a8c..8e45a91 100644 --- a/banyand/tsdb/series_seek_sort.go +++ b/banyand/tsdb/series_seek_sort.go @@ -53,13 +53,19 @@ func (s *seekerBuilder) buildSeries(filters []filterFn) []Iterator { func (s *seekerBuilder) buildSeriesByIndex(filters []filterFn) (series []Iterator) { for _, b := range s.seriesSpan.blocks { + var inner index.FieldIterator + term := index.Term{ + SeriesID: s.seriesSpan.seriesID, + IndexRule: s.indexRuleForSorting.GetMetadata().GetName(), + } switch s.indexRuleForSorting.GetType() { case databasev2.IndexRule_TYPE_TREE: - series = append(series, newSearcherIterator(s.seriesSpan.l, b.lsmIndexReader(). - FieldIterator([]byte(s.indexRuleForSorting.GetMetadata().GetName()), s.order), b.dataReader(), s.seriesSpan.seriesID, filters)) + inner = b.lsmIndexReader().FieldIterator(term.Marshal(), s.order) case databasev2.IndexRule_TYPE_INVERTED: - series = append(series, newSearcherIterator(s.seriesSpan.l, b.invertedIndexReader(). - FieldIterator([]byte(s.indexRuleForSorting.GetMetadata().GetName()), s.order), b.dataReader(), s.seriesSpan.seriesID, filters)) + inner = b.invertedIndexReader().FieldIterator(term.Marshal(), s.order) + } + if inner != nil { + series = append(series, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), s.seriesSpan.seriesID, filters)) } } return @@ -79,16 +85,17 @@ func (s *seekerBuilder) buildSeriesByTime(filters []filterFn) []Iterator { }) } delegated := make([]Iterator, 0, len(bb)) - var bTimes []time.Time + bTimes := make([]time.Time, 0, len(bb)) for _, b := range bb { bTimes = append(bTimes, b.startTime()) - delegated = append(delegated, newSearcherIterator( - s.seriesSpan.l, - b.primaryIndexReader(). - FieldIterator( - s.seriesSpan.seriesID.Marshal(), - s.order, - ), b.dataReader(), s.seriesSpan.seriesID, filters)) + inner := b.primaryIndexReader(). + FieldIterator( + s.seriesSpan.seriesID.Marshal(), + s.order, + ) + if inner != nil { + delegated = append(delegated, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), s.seriesSpan.seriesID, filters)) + } } s.seriesSpan.l.Debug(). Str("order", modelv2.QueryOrder_Sort_name[int32(s.order)]). @@ -116,7 +123,7 @@ func (s *searcherIterator) Next() bool { v := s.fieldIterator.Val() s.cur = v.Value.Iterator() s.curKey = v.Key - s.l.Trace().Hex("term_field", s.curKey).Msg("got a new field") + s.l.Trace().Uint64("series_id", uint64(s.seriesID)).Hex("term_field", s.curKey).Msg("got a new field") } else { return false } @@ -125,11 +132,11 @@ func (s *searcherIterator) Next() bool { for _, filter := range s.filters { if !filter(s.Val()) { - s.l.Trace().Uint64("item_id", uint64(s.Val().ID())).Msg("ignore the item") + s.l.Trace().Uint64("series_id", uint64(s.seriesID)).Uint64("item_id", uint64(s.Val().ID())).Msg("ignore the item") return s.Next() } } - s.l.Trace().Uint64("item_id", uint64(s.Val().ID())).Msg("got an item") + s.l.Trace().Uint64("series_id", uint64(s.seriesID)).Uint64("item_id", uint64(s.Val().ID())).Msg("got an item") return true } s.cur = nil @@ -173,9 +180,8 @@ func (m *mergedIterator) Next() bool { m.index++ if m.index >= len(m.delegated) { return false - } else { - m.curr = m.delegated[m.index] } + m.curr = m.delegated[m.index] } hasNext := m.curr.Next() if !hasNext { diff --git a/banyand/tsdb/series_write.go b/banyand/tsdb/series_write.go index af59b53..b335be9 100644 --- a/banyand/tsdb/series_write.go +++ b/banyand/tsdb/series_write.go @@ -96,11 +96,11 @@ func (w *writerBuilder) Build() (Writer, error) { block: w.block, ts: w.ts, itemID: &GlobalItemID{ - shardID: w.series.shardID, + ShardID: w.series.shardID, segID: segID, blockID: blockID, - seriesID: w.series.seriesID, - id: common.ItemID(uint64(w.ts.UnixNano())), + SeriesID: w.series.seriesID, + ID: common.ItemID(uint64(w.ts.UnixNano())), }, columns: w.values, }, nil @@ -131,20 +131,20 @@ func (w *writer) ItemID() GlobalItemID { func (w *writer) WriteLSMIndex(field index.Field) error { t := index.Term{ - SeriesID: w.itemID.seriesID, + SeriesID: w.itemID.SeriesID, IndexRule: string(field.Term), } field.Term = t.Marshal() - return w.block.writeLSMIndex(field, w.itemID.id) + return w.block.writeLSMIndex(field, w.itemID.ID) } func (w *writer) WriteInvertedIndex(field index.Field) error { t := index.Term{ - SeriesID: w.itemID.seriesID, + SeriesID: w.itemID.SeriesID, IndexRule: string(field.Term), } field.Term = t.Marshal() - return w.block.writeInvertedIndex(field, w.itemID.id) + return w.block.writeInvertedIndex(field, w.itemID.ID) } type dataBucket struct { @@ -163,7 +163,7 @@ func (w *writer) Write() (GlobalItemID, error) { id := w.ItemID() for _, c := range w.columns { err := w.block.write(dataBucket{ - seriesID: w.itemID.seriesID, + seriesID: w.itemID.SeriesID, family: c.family, }.marshal(), c.val, w.ts) @@ -172,7 +172,7 @@ func (w *writer) Write() (GlobalItemID, error) { } } return id, w.block.writePrimaryIndex(index.Field{ - Term: id.seriesID.Marshal(), + Term: id.SeriesID.Marshal(), Value: convert.Int64ToBytes(w.ts.UnixNano()), - }, id.id) + }, id.ID) } diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go index ccae717..422029f 100644 --- a/banyand/tsdb/seriesdb.go +++ b/banyand/tsdb/seriesdb.go @@ -87,6 +87,7 @@ func NewPath(entries []Entry) Path { type SeriesDatabase interface { io.Closer + GetByID(id common.SeriesID) (Series, error) Get(entity Entity) (Series, error) List(path Path) (SeriesList, error) } @@ -109,6 +110,10 @@ type seriesDB struct { sID common.ShardID } +func (s *seriesDB) GetByID(id common.SeriesID) (Series, error) { + return newSeries(s.context(), id, s), nil +} + func (s *seriesDB) block(id GlobalItemID) blockDelegate { return s.lst[id.segID].lst[id.blockID].delegate() } diff --git a/banyand/tsdb/seriesdb_test.go b/banyand/tsdb/seriesdb_test.go index a87d24c..8819772 100644 --- a/banyand/tsdb/seriesdb_test.go +++ b/banyand/tsdb/seriesdb_test.go @@ -338,7 +338,7 @@ func setUpEntities(t *assert.Assertions, db SeriesDatabase) []*entityWithID { } func newMockSeries(id common.SeriesID) *series { - return newSeries(nil, id, nil) + return newSeries(context.TODO(), id, nil) } func transform(list SeriesList) (seriesIDs []common.SeriesID) { diff --git a/pkg/index/index.go b/pkg/index/index.go index 46c70be..d6cbf86 100644 --- a/pkg/index/index.go +++ b/pkg/index/index.go @@ -33,10 +33,6 @@ func (f Field) Marshal() []byte { return bytes.Join([][]byte{f.Term, f.Value}, nil) } -type FieldSpec struct { - Name string -} - type RangeOpts struct { Upper []byte Lower []byte diff --git a/pkg/index/inverted/field_map.go b/pkg/index/inverted/field_map.go index f8d3789..57826d1 100644 --- a/pkg/index/inverted/field_map.go +++ b/pkg/index/inverted/field_map.go @@ -18,6 +18,8 @@ package inverted import ( + "sync" + "github.com/pkg/errors" "github.com/apache/skywalking-banyandb/api/common" @@ -30,7 +32,8 @@ var ErrFieldAbsent = errors.New("field doesn't exist") type fieldHashID uint64 type fieldMap struct { - repo map[fieldHashID]*fieldValue + repo map[fieldHashID]*fieldValue + mutex sync.RWMutex } func newFieldMap(initialSize int) *fieldMap { @@ -39,22 +42,32 @@ func newFieldMap(initialSize int) *fieldMap { } } -func (fm *fieldMap) createKey(key []byte) { - fm.repo[fieldHashID(convert.Hash(key))] = &fieldValue{ +func (fm *fieldMap) createKey(key []byte) *fieldValue { + result := &fieldValue{ key: key, value: newPostingMap(), } + fm.repo[fieldHashID(convert.Hash(key))] = result + return result } func (fm *fieldMap) get(key []byte) (*fieldValue, bool) { + fm.mutex.RLock() + defer fm.mutex.RUnlock() + return fm.getWithoutLock(key) +} + +func (fm *fieldMap) getWithoutLock(key []byte) (*fieldValue, bool) { v, ok := fm.repo[fieldHashID(convert.Hash(key))] return v, ok } func (fm *fieldMap) put(fv index.Field, id common.ItemID) error { - pm, ok := fm.get(fv.Term) + fm.mutex.Lock() + defer fm.mutex.Unlock() + pm, ok := fm.getWithoutLock(fv.Term) if !ok { - return errors.Wrapf(ErrFieldAbsent, "filed Term:%s", fv.Term) + pm = fm.createKey(fv.Term) } return pm.value.put(fv.Value, id) } diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go index d610d54..569d434 100644 --- a/pkg/index/inverted/inverted.go +++ b/pkg/index/inverted/inverted.go @@ -24,7 +24,6 @@ import ( type GlobalStore interface { Searcher() index.Searcher - Initialize(fields []index.FieldSpec) error Insert(field index.Field, docID common.ItemID) error } @@ -37,10 +36,6 @@ func (s *store) Searcher() index.Searcher { return s.memTable } -func (s *store) Initialize(fields []index.FieldSpec) error { - return s.memTable.Initialize(fields) -} - func (s *store) Insert(field index.Field, chunkID common.ItemID) error { return s.memTable.Insert(field, chunkID) } diff --git a/pkg/index/inverted/mem.go b/pkg/index/inverted/mem.go index e2db1f6..a6e2c12 100644 --- a/pkg/index/inverted/mem.go +++ b/pkg/index/inverted/mem.go @@ -41,21 +41,11 @@ type MemTable struct { func NewMemTable(name string) *MemTable { return &MemTable{ - name: name, + name: name, + terms: newFieldMap(1000), } } -func (m *MemTable) Initialize(fields []index.FieldSpec) error { - if len(fields) < 1 { - return ErrFieldsAbsent - } - m.terms = newFieldMap(len(fields)) - for _, f := range fields { - m.terms.createKey([]byte(f.Name)) - } - return nil -} - func (m *MemTable) Insert(field index.Field, chunkID common.ItemID) error { return m.terms.put(field, chunkID) } diff --git a/pkg/index/inverted/mem_test.go b/pkg/index/inverted/mem_test.go index 45117a7..d3c4731 100644 --- a/pkg/index/inverted/mem_test.go +++ b/pkg/index/inverted/mem_test.go @@ -26,56 +26,15 @@ import ( "github.com/apache/skywalking-banyandb/api/common" modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2" "github.com/apache/skywalking-banyandb/pkg/convert" + "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/index/posting" "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring" ) -func TestMemTable_Initialize(t *testing.T) { - type args struct { - fields []FieldSpec - } - tests := []struct { - name string - args args - wantErr bool - }{ - { - name: "golden path", - args: args{ - fields: []FieldSpec{ - { - Name: "service_name", - }, - { - Name: "duration", - }, - }, - }, - }, - { - name: "fields absent", - wantErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - m := NewMemTable("sw") - var err error - if err = m.Initialize(tt.args.fields); (err != nil) != tt.wantErr { - t.Errorf("Initialize() error = %v, wantErr %v", err, tt.wantErr) - } - if err != nil { - return - } - assert.Equal(t, len(m.terms.repo), len(tt.args.fields)) - }) - } -} - func TestMemTable_Range(t *testing.T) { type args struct { fieldName []byte - opts *RangeOpts + opts index.RangeOpts } m := NewMemTable("sw") setUp(t, m) @@ -88,13 +47,13 @@ func TestMemTable_Range(t *testing.T) { name: "in range", args: args{ fieldName: []byte("duration"), - opts: &RangeOpts{ + opts: index.RangeOpts{ Lower: convert.Uint16ToBytes(100), Upper: convert.Uint16ToBytes(500), }, }, - wantList: m.MatchTerms(&Field{ - Name: []byte("duration"), + wantList: m.MatchTerms(index.Field{ + Term: []byte("duration"), Value: convert.Uint16ToBytes(200), }), }, @@ -102,14 +61,14 @@ func TestMemTable_Range(t *testing.T) { name: "excludes edge", args: args{ fieldName: []byte("duration"), - opts: &RangeOpts{ + opts: index.RangeOpts{ Lower: convert.Uint16ToBytes(50), Upper: convert.Uint16ToBytes(1000), }, }, wantList: union(m, - &Field{ - Name: []byte("duration"), + index.Field{ + Term: []byte("duration"), Value: convert.Uint16ToBytes(200), }, ), @@ -118,19 +77,19 @@ func TestMemTable_Range(t *testing.T) { name: "includes lower", args: args{ fieldName: []byte("duration"), - opts: &RangeOpts{ + opts: index.RangeOpts{ Lower: convert.Uint16ToBytes(50), Upper: convert.Uint16ToBytes(1000), IncludesLower: true, }, }, wantList: union(m, - &Field{ - Name: []byte("duration"), + index.Field{ + Term: []byte("duration"), Value: convert.Uint16ToBytes(50), }, - &Field{ - Name: []byte("duration"), + index.Field{ + Term: []byte("duration"), Value: convert.Uint16ToBytes(200), }, ), @@ -139,19 +98,19 @@ func TestMemTable_Range(t *testing.T) { name: "includes upper", args: args{ fieldName: []byte("duration"), - opts: &RangeOpts{ + opts: index.RangeOpts{ Lower: convert.Uint16ToBytes(50), Upper: convert.Uint16ToBytes(1000), IncludesUpper: true, }, }, wantList: union(m, - &Field{ - Name: []byte("duration"), + index.Field{ + Term: []byte("duration"), Value: convert.Uint16ToBytes(200), }, - &Field{ - Name: []byte("duration"), + index.Field{ + Term: []byte("duration"), Value: convert.Uint16ToBytes(1000), }, ), @@ -160,7 +119,7 @@ func TestMemTable_Range(t *testing.T) { name: "includes edges", args: args{ fieldName: []byte("duration"), - opts: &RangeOpts{ + opts: index.RangeOpts{ Lower: convert.Uint16ToBytes(50), Upper: convert.Uint16ToBytes(1000), IncludesUpper: true, @@ -168,16 +127,16 @@ func TestMemTable_Range(t *testing.T) { }, }, wantList: union(m, - &Field{ - Name: []byte("duration"), + index.Field{ + Term: []byte("duration"), Value: convert.Uint16ToBytes(50), }, - &Field{ - Name: []byte("duration"), + index.Field{ + Term: []byte("duration"), Value: convert.Uint16ToBytes(200), }, - &Field{ - Name: []byte("duration"), + index.Field{ + Term: []byte("duration"), Value: convert.Uint16ToBytes(1000), }, ), @@ -186,7 +145,7 @@ func TestMemTable_Range(t *testing.T) { name: "match one", args: args{ fieldName: []byte("duration"), - opts: &RangeOpts{ + opts: index.RangeOpts{ Lower: convert.Uint16ToBytes(200), Upper: convert.Uint16ToBytes(200), IncludesUpper: true, @@ -194,8 +153,8 @@ func TestMemTable_Range(t *testing.T) { }, }, wantList: union(m, - &Field{ - Name: []byte("duration"), + index.Field{ + Term: []byte("duration"), Value: convert.Uint16ToBytes(200), }, ), @@ -249,14 +208,14 @@ func TestMemTable_Iterator(t *testing.T) { _ = iter.Close() }() for iter.Next() { - got = append(got, iter.Val().key) + got = append(got, iter.Val().Key) } tester.Equal(tt.want, got) }) } } -func union(memTable *MemTable, fields ...*Field) posting.List { +func union(memTable *MemTable, fields ...index.Field) posting.List { result := roaring.NewPostingList() for _, f := range fields { _ = result.Union(memTable.MatchTerms(f)) @@ -265,23 +224,15 @@ func union(memTable *MemTable, fields ...*Field) posting.List { } func setUp(t *testing.T, mt *MemTable) { - assert.NoError(t, mt.Initialize([]FieldSpec{ - { - Name: "service_name", - }, - { - Name: "duration", - }, - })) for i := 0; i < 100; i++ { if i%2 == 0 { - assert.NoError(t, mt.Insert(&Field{ - Name: []byte("service_name"), + assert.NoError(t, mt.Insert(index.Field{ + Term: []byte("service_name"), Value: []byte("gateway"), }, common.ItemID(i))) } else { - assert.NoError(t, mt.Insert(&Field{ - Name: []byte("service_name"), + assert.NoError(t, mt.Insert(index.Field{ + Term: []byte("service_name"), Value: []byte("webpage"), }, common.ItemID(i))) } @@ -289,18 +240,18 @@ func setUp(t *testing.T, mt *MemTable) { for i := 100; i < 200; i++ { switch { case i%3 == 0: - assert.NoError(t, mt.Insert(&Field{ - Name: []byte("duration"), + assert.NoError(t, mt.Insert(index.Field{ + Term: []byte("duration"), Value: convert.Uint16ToBytes(50), }, common.ItemID(i))) case i%3 == 1: - assert.NoError(t, mt.Insert(&Field{ - Name: []byte("duration"), + assert.NoError(t, mt.Insert(index.Field{ + Term: []byte("duration"), Value: convert.Uint16ToBytes(200), }, common.ItemID(i))) case i%3 == 2: - assert.NoError(t, mt.Insert(&Field{ - Name: []byte("duration"), + assert.NoError(t, mt.Insert(index.Field{ + Term: []byte("duration"), Value: convert.Uint16ToBytes(1000), }, common.ItemID(i))) } diff --git a/pkg/index/search.go b/pkg/index/search.go index 3fa8bd4..833d904 100644 --- a/pkg/index/search.go +++ b/pkg/index/search.go @@ -268,7 +268,7 @@ type not struct { } func (n *not) Execute() (posting.List, error) { - all := n.searcher.MatchField([]byte(n.Key)) + all := n.searcher.MatchField(n.Key) list, err := n.Inner.Execute() if err != nil { return nil, err @@ -289,7 +289,7 @@ type eq struct { func (eq *eq) Execute() (posting.List, error) { return eq.searcher.MatchTerms(Field{ - Term: []byte(eq.Key), + Term: eq.Key, Value: bytes.Join(eq.Values, nil), }), nil }
