This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch index-posting in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 1400dd16ddbb511245cf19f475f2bab8fd5ed6ac Author: Gao Hongtao <[email protected]> AuthorDate: Tue Jul 20 12:58:34 2021 +0800 Introduce posting list Signed-off-by: Gao Hongtao <[email protected]> --- api/common/id.go | 32 ---- banyand/index/index.go | 3 +- banyand/series/idgen.go | 20 +-- banyand/series/series.go | 3 +- banyand/series/trace/common_test.go | 14 +- banyand/series/trace/query.go | 75 ++++++--- banyand/series/trace/query_test.go | 39 +++-- banyand/series/trace/schema_test.go | 2 +- banyand/series/trace/trace.go | 5 +- banyand/series/trace/write.go | 5 +- go.mod | 1 + go.sum | 8 + pkg/convert/number.go | 10 ++ banyand/index/index.go => pkg/posting/posting.go | 64 +++++--- pkg/posting/roaring/roaring.go | 194 +++++++++++++++++++++++ pkg/query/logical/common_test.go | 31 ++-- pkg/query/logical/plan_execution_test.go | 12 +- pkg/query/logical/plan_indexscan.go | 11 +- 18 files changed, 386 insertions(+), 143 deletions(-) diff --git a/api/common/id.go b/api/common/id.go index b06573a..443ca83 100644 --- a/api/common/id.go +++ b/api/common/id.go @@ -23,35 +23,3 @@ type SeriesID uint64 const ( DataBinaryFieldName = "data_binary" ) - -type ChunkIDs []ChunkID - -// HashIntersect returns an intersection of two ChunkID arrays -// without any assumptions on the order. It uses a HashMap to mark -// the existence of a item. -func (c ChunkIDs) HashIntersect(other ChunkIDs) ChunkIDs { - if len(c) == 0 || len(other) == 0 { - return []ChunkID{} - } - smaller, larger, minLen := min(c, other) - intersection := make([]ChunkID, 0, minLen) - hash := make(map[ChunkID]struct{}) - for _, item := range smaller { - hash[item] = struct{}{} - } - for _, item := range larger { - if _, exist := hash[item]; exist { - intersection = append(intersection, item) - } - } - return intersection -} - -func min(a, b ChunkIDs) (ChunkIDs, ChunkIDs, int) { - aLen := len(a) - bLen := len(b) - if aLen < bLen { - return a, b, aLen - } - return b, a, bLen -} diff --git a/banyand/index/index.go b/banyand/index/index.go index f8527c8..3c913ce 100644 --- a/banyand/index/index.go +++ b/banyand/index/index.go @@ -25,6 +25,7 @@ import ( apiv1 "github.com/apache/skywalking-banyandb/api/fbs/v1" "github.com/apache/skywalking-banyandb/banyand/discovery" "github.com/apache/skywalking-banyandb/banyand/queue" + posting "github.com/apache/skywalking-banyandb/pkg/posting" "github.com/apache/skywalking-banyandb/pkg/run" ) @@ -35,7 +36,7 @@ type Condition struct { } type Repo interface { - Search(index common.Metadata, startTime, endTime uint64, conditions []Condition) ([]common.ChunkID, error) + Search(index common.Metadata, shardID uint, startTime, endTime uint64, conditions []Condition) (posting.List, error) } type Builder interface { diff --git a/banyand/series/idgen.go b/banyand/series/idgen.go index e4d217a..9394c9d 100644 --- a/banyand/series/idgen.go +++ b/banyand/series/idgen.go @@ -21,16 +21,10 @@ import ( "time" ) -const ( - timeStampLen = 41 + 1 - shardIDLen = 22 -) - var startTime = uint64(time.Date(2021, 07, 01, 23, 0, 0, 0, time.UTC).UTC().UnixNano() / 1e6) type IDGen interface { - Next(shard uint, ts uint64) uint64 - ParseShardID(ID uint64) (uint, error) + Next(ts uint64) uint64 ParseTS(ID uint64) (uint64, error) } @@ -43,17 +37,11 @@ var _ IDGen = (*localID)(nil) type localID struct { } -func (l *localID) Next(shard uint, ts uint64) uint64 { +func (l *localID) Next(ts uint64) uint64 { df := ts/1e6 - startTime - id := (df << shardIDLen) | uint64(shard) - return id -} - -func (l *localID) ParseShardID(ID uint64) (uint, error) { - shardID := (ID << timeStampLen) >> timeStampLen - return uint(shardID), nil + return df } func (l *localID) ParseTS(ID uint64) (uint64, error) { - return (ID>>shardIDLen + startTime) * 1e6, nil + return (ID + startTime) * 1e6, nil } diff --git a/banyand/series/series.go b/banyand/series/series.go index db280f3..73a9e05 100644 --- a/banyand/series/series.go +++ b/banyand/series/series.go @@ -25,6 +25,7 @@ import ( "github.com/apache/skywalking-banyandb/api/data" v1 "github.com/apache/skywalking-banyandb/api/fbs/v1" "github.com/apache/skywalking-banyandb/banyand/series/schema" + posting2 "github.com/apache/skywalking-banyandb/pkg/posting" "github.com/apache/skywalking-banyandb/pkg/run" ) @@ -50,7 +51,7 @@ type TraceRepo interface { //FetchTrace returns data.Trace by traceID FetchTrace(traceSeries common.Metadata, traceID string, opt ScanOptions) (data.Trace, error) //FetchEntity returns data.Entity by ChunkID - FetchEntity(traceSeries common.Metadata, chunkIDs []common.ChunkID, opt ScanOptions) ([]data.Entity, error) + FetchEntity(traceSeries common.Metadata, shardID uint, chunkIDs posting2.List, opt ScanOptions) ([]data.Entity, error) //ScanEntity returns data.Entity between a duration by ScanOptions ScanEntity(traceSeries common.Metadata, startTime, endTime uint64, opt ScanOptions) ([]data.Entity, error) } diff --git a/banyand/series/trace/common_test.go b/banyand/series/trace/common_test.go index fd0769a..acbc986 100644 --- a/banyand/series/trace/common_test.go +++ b/banyand/series/trace/common_test.go @@ -241,8 +241,8 @@ func getEntityWithTS(id string, binary []byte, ts uint64, items ...interface{}) } } -func setUpTestData(t *testing.T, ts *traceSeries, seriesEntities []seriesEntity) (chunkIDs []common.ChunkID) { - chunkIDs = make([]common.ChunkID, 0, len(seriesEntities)) +func setUpTestData(t *testing.T, ts *traceSeries, seriesEntities []seriesEntity) (results []idWithShard) { + results = make([]idWithShard, 0, len(seriesEntities)) for _, se := range seriesEntities { seriesID := []byte(se.seriesID) b := fb.NewWriteEntityBuilder() @@ -255,7 +255,8 @@ func setUpTestData(t *testing.T, ts *traceSeries, seriesEntities []seriesEntity) ) assert.NoError(t, err) we := v1.GetRootAsWriteEntity(builder.FinishedBytes(), 0) - got, err := ts.Write(common.SeriesID(convert.Hash(seriesID)), partition.ShardID(seriesID, 2), data.EntityValue{ + shardID := partition.ShardID(seriesID, 2) + got, err := ts.Write(common.SeriesID(convert.Hash(seriesID)), shardID, data.EntityValue{ EntityValue: we.Entity(nil), }) if err != nil { @@ -264,7 +265,10 @@ func setUpTestData(t *testing.T, ts *traceSeries, seriesEntities []seriesEntity) if got < 1 { t.Error("Write() got empty chunkID") } - chunkIDs = append(chunkIDs, got) + results = append(results, idWithShard{ + id: got, + shardID: shardID, + }) } - return chunkIDs + return results } diff --git a/banyand/series/trace/query.go b/banyand/series/trace/query.go index 4c2af23..7eb5bef 100644 --- a/banyand/series/trace/query.go +++ b/banyand/series/trace/query.go @@ -33,6 +33,8 @@ import ( "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/fb" "github.com/apache/skywalking-banyandb/pkg/partition" + "github.com/apache/skywalking-banyandb/pkg/posting" + "github.com/apache/skywalking-banyandb/pkg/posting/roaring" ) func (t *traceSeries) FetchTrace(traceID string, opt series.ScanOptions) (trace data.Trace, err error) { @@ -52,13 +54,22 @@ func (t *traceSeries) FetchTrace(traceID string, opt series.ScanOptions) (trace if len(bb) < 1 { return trace, nil } - chunkIDs := make([]common.ChunkID, len(bb)) - for i, b := range bb { - chunkIDs[i] = common.ChunkID(convert.BytesToUint64(b)) + dataMap := make(map[uint]posting.List) + for _, b := range bb { + id := idWithShard{ + id: common.ChunkID(convert.BytesToUint64(b[2:])), + shardID: uint(convert.BytesToUint16(b[:2])), + } + placeID(dataMap, id) } - entities, errEntity := t.FetchEntity(chunkIDs, opt) - if errEntity != nil { - return trace, errEntity + var entities []data.Entity + for s, c := range dataMap { + ee, errEntity := t.FetchEntity(c, s, opt) + if errEntity != nil { + err = multierr.Append(err, errEntity) + continue + } + entities = append(entities, ee...) } return data.Trace{ KindVersion: data.TraceKindVersion, @@ -88,13 +99,14 @@ func (t *traceSeries) ScanEntity(startTime, endTime uint64, opt series.ScanOptio copy(key[1:], startTimeBytes) seekKeys = append(seekKeys, key) } - chunkIDs := make([]common.ChunkID, 0, total) + entities := make([]data.Entity, 0, total) var num uint32 opts := kv.DefaultScanOpts opts.PrefetchValues = false opts.PrefetchSize = int(total) var errAll error for i := uint(0); i < t.shardNum; i++ { + chunkIDs := roaring.NewPostingList() for _, seekKey := range seekKeys { state := seekKey[0] err := t.reader.Reader(i, startTimeIndex, startTime, endTime).Scan( @@ -113,8 +125,7 @@ func (t *traceSeries) ScanEntity(startTime, endTime uint64, opt series.ScanOptio } chunk := make([]byte, len(key)-8-1) copy(chunk, key[8+1:]) - chunkID := common.ChunkID(convert.BytesToUint64(chunk)) - chunkIDs = append(chunkIDs, chunkID) + chunkIDs.Insert(common.ChunkID(convert.BytesToUint64(chunk))) num++ if num > total { return kv.ErrStopScan @@ -125,23 +136,25 @@ func (t *traceSeries) ScanEntity(startTime, endTime uint64, opt series.ScanOptio errAll = multierr.Append(errAll, err) } } - } - if len(chunkIDs) < 1 { - return nil, errAll - } - entities, err := t.FetchEntity(chunkIDs, opt) - if err != nil { - errAll = multierr.Append(errAll, err) + if chunkIDs.IsEmpty() { + continue + } + ee, err := t.FetchEntity(chunkIDs, i, opt) + if err != nil { + errAll = multierr.Append(errAll, err) + continue + } + entities = append(entities, ee...) } return entities, errAll } -func (t *traceSeries) FetchEntity(chunkIDs []common.ChunkID, opt series.ScanOptions) (entities []data.Entity, err error) { - chunkIDsLen := len(chunkIDs) +func (t *traceSeries) FetchEntity(chunkIDs posting.List, shardID uint, opt series.ScanOptions) (entities []data.Entity, err error) { + chunkIDsLen := chunkIDs.Len() if chunkIDsLen < 1 { return nil, ErrChunkIDsEmpty } - entities = make([]data.Entity, 0, len(chunkIDs)) + entities = make([]data.Entity, 0, chunkIDsLen) fetchDataBinary, fetchFieldsIndices, errInfo := t.parseFetchInfo(opt) if errInfo != nil { return nil, errInfo @@ -149,12 +162,10 @@ func (t *traceSeries) FetchEntity(chunkIDs []common.ChunkID, opt series.ScanOpti if !fetchDataBinary && len(fetchFieldsIndices) < 1 { return nil, ErrProjectionEmpty } - for _, id := range chunkIDs { + + for iter := chunkIDs.Iterator(); iter.Next(); { + id := iter.Current() chunkID := uint64(id) - shardID, errParseID := t.idGen.ParseShardID(chunkID) - if errParseID != nil { - err = multierr.Append(err, errParseID) - } ts, errParseTS := t.idGen.ParseTS(chunkID) if errParseTS != nil { err = multierr.Append(err, errParseTS) @@ -252,3 +263,19 @@ func (t *traceSeries) getEntityByInternalRef(seriesID []byte, state State, fetch Entity: v1.GetRootAsEntity(b.FinishedBytes(), 0), }, nil } + +type idWithShard struct { + id common.ChunkID + shardID uint +} + +func placeID(chunkIDCriteria map[uint]posting.List, data idWithShard) { + list, ok := chunkIDCriteria[data.shardID] + if ok { + list.Insert(data.id) + return + } + list = roaring.NewPostingList() + list.Insert(data.id) + chunkIDCriteria[data.shardID] = list +} diff --git a/banyand/series/trace/query_test.go b/banyand/series/trace/query_test.go index 49530db..841c670 100644 --- a/banyand/series/trace/query_test.go +++ b/banyand/series/trace/query_test.go @@ -24,15 +24,17 @@ import ( "time" "github.com/stretchr/testify/assert" + "go.uber.org/multierr" "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/banyand/series" + "github.com/apache/skywalking-banyandb/pkg/posting" ) func Test_traceSeries_FetchEntity(t *testing.T) { type args struct { chunkIDIndices []int - chunkIDs common.ChunkIDs + chunkIDs []idWithShard opt series.ScanOptions } tests := []struct { @@ -92,15 +94,23 @@ func Test_traceSeries_FetchEntity(t *testing.T) { { name: "invalid chunk ids", args: args{ - chunkIDs: common.ChunkIDs{0}, - opt: series.ScanOptions{Projection: []string{"trace_id", "data_binary"}}, + chunkIDs: []idWithShard{ + { + id: common.ChunkID(0), + }, + }, + opt: series.ScanOptions{Projection: []string{"trace_id", "data_binary"}}, }, wantErr: true, }, { name: "mix up invalid/valid ids", args: args{ - chunkIDs: common.ChunkIDs{0}, + chunkIDs: []idWithShard{ + { + id: common.ChunkID(0), + }, + }, chunkIDIndices: []int{0, 1}, opt: series.ScanOptions{Projection: []string{"trace_id", "data_binary"}}, }, @@ -128,20 +138,29 @@ func Test_traceSeries_FetchEntity(t *testing.T) { } ts, stopFunc := setup(t) defer stopFunc() - chunkIDs := setUpTestData(t, ts, testData(uint64(time.Now().UnixNano()))) + dataResult := setUpTestData(t, ts, testData(uint64(time.Now().UnixNano()))) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - chunkIDCriteria := make([]common.ChunkID, 0, len(tt.args.chunkIDIndices)) + chunkIDCriteria := make(map[uint]posting.List, 2) for i := range tt.args.chunkIDIndices { - chunkIDCriteria = append(chunkIDCriteria, chunkIDs[i]) + placeID(chunkIDCriteria, dataResult[i]) } for _, id := range tt.args.chunkIDs { - chunkIDCriteria = append(chunkIDCriteria, id) + placeID(chunkIDCriteria, id) + } + var entities ByEntityID + var err error + for s, c := range chunkIDCriteria { + ee, errFetch := ts.FetchEntity(c, s, tt.args.opt) + if errFetch != nil { + err = multierr.Append(err, errFetch) + } + entities = append(entities, ee...) } - entities, err := ts.FetchEntity(chunkIDCriteria, tt.args.opt) if (err != nil) != tt.wantErr { t.Errorf("Write() error = %v, wantErr %v", err, tt.wantErr) } + sort.Sort(entities) assert.Equal(t, len(tt.wantEntities), len(entities)) for i, e := range entities { assert.EqualValues(t, tt.wantEntities[i].entityID, e.EntityId()) @@ -266,7 +285,7 @@ func Test_traceSeries_ScanEntity(t *testing.T) { { name: "single result", args: args{ - start: baseTS, + start: 0, end: baseTS, }, wantEntities: []wantEntity{ diff --git a/banyand/series/trace/schema_test.go b/banyand/series/trace/schema_test.go index 57c0d1b..89b3369 100644 --- a/banyand/series/trace/schema_test.go +++ b/banyand/series/trace/schema_test.go @@ -59,7 +59,7 @@ func Test_service_RulesBySubject(t *testing.T) { want: getIndexRule("sw-index-rule", "default"), }, { - name: "got empty result", + name: "got empty idWithShard", args: args{ series: createSubject("sw", "default"), filter: func(object v1.IndexObject) bool { diff --git a/banyand/series/trace/trace.go b/banyand/series/trace/trace.go index e6aa39d..68b3015 100644 --- a/banyand/series/trace/trace.go +++ b/banyand/series/trace/trace.go @@ -39,6 +39,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/bus" "github.com/apache/skywalking-banyandb/pkg/fb" "github.com/apache/skywalking-banyandb/pkg/logger" + posting2 "github.com/apache/skywalking-banyandb/pkg/posting" ) const ( @@ -162,12 +163,12 @@ func (s *service) FetchTrace(traceSeries common.Metadata, traceID string, opt se return ts.FetchTrace(traceID, opt) } -func (s *service) FetchEntity(traceSeries common.Metadata, chunkIDs []common.ChunkID, opt series.ScanOptions) ([]data.Entity, error) { +func (s *service) FetchEntity(traceSeries common.Metadata, shardID uint, chunkIDs posting2.List, opt series.ScanOptions) ([]data.Entity, error) { ts, err := s.getSeries(traceSeries) if err != nil { return nil, err } - return ts.FetchEntity(chunkIDs, opt) + return ts.FetchEntity(chunkIDs, shardID, opt) } func (s *service) ScanEntity(traceSeries common.Metadata, startTime, endTime uint64, opt series.ScanOptions) ([]data.Entity, error) { diff --git a/banyand/series/trace/write.go b/banyand/series/trace/write.go index 7b78b7d..5d28af7 100644 --- a/banyand/series/trace/write.go +++ b/banyand/series/trace/write.go @@ -44,7 +44,8 @@ func (t *traceSeries) Write(seriesID common.SeriesID, shardID uint, entity data. return 0, errGetState } stateBytes := []byte{byte(state)} - chunkID := t.idGen.Next(shardID, entity.TimestampNanoseconds()) + tts := entity.TimestampNanoseconds() + chunkID := t.idGen.Next(tts) ts, errParseTS := t.idGen.ParseTS(chunkID) if errParseTS != nil { return 0, errors.Wrap(errParseTS, "failed to parse timestamp from chunk id") @@ -70,7 +71,7 @@ func (t *traceSeries) Write(seriesID common.SeriesID, shardID uint, entity data. return 0, errors.Wrap(err, "failed to write chunkID index") } traceIDShardID := partition.ShardID(traceID, t.shardNum) - if err = wp.TimeSeriesWriter(traceIDShardID, traceIndex).Put(traceID, chunkIDBytes, entity.TimestampNanoseconds()); err != nil { + if err = wp.TimeSeriesWriter(traceIDShardID, traceIndex).Put(traceID, bydb_bytes.Join(convert.Uint16ToBytes(uint16(shardID)), chunkIDBytes), entity.TimestampNanoseconds()); err != nil { return 0, errors.Wrap(err, "failed to Trace index") } err = wp.Writer(shardID, startTimeIndex).Put(bydb_bytes.Join(stateBytes, tsBytes, chunkIDBytes), nil) diff --git a/go.mod b/go.mod index 2f971df..317ce5a 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/apache/skywalking-banyandb go 1.16 require ( + github.com/RoaringBitmap/roaring v0.9.1 // indirect github.com/cespare/xxhash v1.1.0 github.com/dgraph-io/badger/v3 v3.2011.1 github.com/golang/mock v1.3.1 diff --git a/go.sum b/go.sum index 24a16ba..a1c449b 100644 --- a/go.sum +++ b/go.sum @@ -16,6 +16,10 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= +github.com/RoaringBitmap/gocroaring v0.4.0/go.mod h1:NieMwz7ZqwU2DD73/vvYwv7r4eWBKuPVSXZIpsaMwCI= +github.com/RoaringBitmap/real-roaring-datasets v0.0.0-20190726190000-eb7c87156f76/go.mod h1:oM0MHmQ3nDsq609SS36p+oYbRi16+oVvU2Bw4Ipv0SE= +github.com/RoaringBitmap/roaring v0.9.1 h1:5PRizBmoN/PfV17nPNQou4dHQ7NcJi8FO/bihdYyCEM= +github.com/RoaringBitmap/roaring v0.9.1/go.mod h1:h1B7iIUOmnAeb5ytYMvnHJwxMc6LUrwBnzXWRuqTQUc= github.com/SkyAPM/badger/v3 v3.0.0-20210624023741-bd2dcfcaaa74 h1:BFHSkDBSYCtPxMgxGz07DfNRYS76KFVDlocQ2U9rY7E= github.com/SkyAPM/badger/v3 v3.0.0-20210624023741-bd2dcfcaaa74/go.mod h1:XieWaNygSGj5ZzSsZO4tQe/2wwLjCvESus4twFqxOKc= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -27,6 +31,8 @@ github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= +github.com/bits-and-blooms/bitset v1.2.0 h1:Kn4yilvwNtMACtf1eYDlG8H77R07mZSPbMjLyS07ChA= +github.com/bits-and-blooms/bitset v1.2.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA= github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= @@ -182,6 +188,8 @@ github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQz github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM= +github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/oklog/run v1.1.0 h1:GEenZ1cK0+q0+wsJew9qUg/DyD8k3JzYsZAi5gYi2mA= github.com/oklog/run v1.1.0/go.mod h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU= diff --git a/pkg/convert/number.go b/pkg/convert/number.go index 3ad4948..c06d7a4 100644 --- a/pkg/convert/number.go +++ b/pkg/convert/number.go @@ -31,6 +31,12 @@ func Int64ToBytes(i int64) []byte { return buf } +func Uint16ToBytes(u uint16) []byte { + bs := make([]byte, 2) + binary.BigEndian.PutUint16(bs, u) + return bs +} + func Uint32ToBytes(u uint32) []byte { bs := make([]byte, 4) binary.BigEndian.PutUint32(bs, u) @@ -45,6 +51,10 @@ func BytesToUint32(b []byte) uint32 { return binary.BigEndian.Uint32(b) } +func BytesToUint16(b []byte) uint16 { + return binary.BigEndian.Uint16(b) +} + func IntToInt64(numbers ...int) []int64 { var arr []int64 for i := 0; i < len(numbers); i++ { diff --git a/banyand/index/index.go b/pkg/posting/posting.go similarity index 54% copy from banyand/index/index.go copy to pkg/posting/posting.go index f8527c8..3812162 100644 --- a/banyand/index/index.go +++ b/pkg/posting/posting.go @@ -15,39 +15,55 @@ // specific language governing permissions and limitations // under the License. -//go:generate mockgen -destination=./index_mock.go -package=index . Repo -package index +package posting import ( - "context" + "github.com/pkg/errors" "github.com/apache/skywalking-banyandb/api/common" - apiv1 "github.com/apache/skywalking-banyandb/api/fbs/v1" - "github.com/apache/skywalking-banyandb/banyand/discovery" - "github.com/apache/skywalking-banyandb/banyand/queue" - "github.com/apache/skywalking-banyandb/pkg/run" ) -type Condition struct { - Key string - Values [][]byte - Op apiv1.BinaryOp -} +var ErrListEmpty = errors.New("postings list is empty") -type Repo interface { - Search(index common.Metadata, startTime, endTime uint64, conditions []Condition) ([]common.ChunkID, error) -} +// List is a collection of common.ChunkID. +type List interface { + Contains(id common.ChunkID) bool -type Builder interface { - run.Config - run.PreRunner -} + IsEmpty() bool + + Max() (common.ChunkID, error) + + Len() int + + Iterator() Iterator + + Clone() List + + Equal(other List) bool + + Insert(i common.ChunkID) -type Service interface { - Repo - Builder + Intersect(other List) error + + Difference(other List) error + + Union(other List) error + + UnionMany(others []List) error + + AddIterator(iter Iterator) error + + AddRange(min, max common.ChunkID) error + + RemoveRange(min, max common.ChunkID) error + + Reset() } -func NewService(ctx context.Context, repo discovery.ServiceRepo, pipeline queue.Queue) (Service, error) { - return nil, nil +type Iterator interface { + Next() bool + + Current() common.ChunkID + + Close() error } diff --git a/pkg/posting/roaring/roaring.go b/pkg/posting/roaring/roaring.go new file mode 100644 index 0000000..7c778d0 --- /dev/null +++ b/pkg/posting/roaring/roaring.go @@ -0,0 +1,194 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package roaring + +import ( + "github.com/RoaringBitmap/roaring/roaring64" + "github.com/pkg/errors" + + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/pkg/posting" +) + +var ( + ErrIntersectRoaringOnly = errors.New("Intersect only supported between roaringDocId sets") + ErrUnionRoaringOnly = errors.New("Union only supported between roaringDocId sets") + ErrDifferenceRoaringOnly = errors.New("Difference only supported between roaringDocId sets") +) + +var _ posting.List = (*postingsList)(nil) + +// postingsList abstracts a Roaring Bitmap. +type postingsList struct { + bitmap *roaring64.Bitmap +} + +func NewPostingList() posting.List { + return &postingsList{ + bitmap: roaring64.New(), + } +} + +func NewPostingListWithInitialData(data ...uint64) posting.List { + list := NewPostingList() + for _, d := range data { + list.Insert(common.ChunkID(d)) + } + return list +} + +func (p *postingsList) Contains(id common.ChunkID) bool { + return p.bitmap.Contains(uint64(id)) +} + +func (p *postingsList) IsEmpty() bool { + return p.bitmap.IsEmpty() +} + +func (p *postingsList) Max() (common.ChunkID, error) { + if p.IsEmpty() { + return 0, posting.ErrListEmpty + } + return common.ChunkID(p.bitmap.Maximum()), nil +} + +func (p *postingsList) Len() int { + return int(p.bitmap.GetCardinality()) +} + +func (p *postingsList) Iterator() posting.Iterator { + return &roaringIterator{ + iter: p.bitmap.Iterator(), + } +} + +func (p *postingsList) Clone() posting.List { + return &postingsList{ + bitmap: p.bitmap.Clone(), + } +} + +func (p *postingsList) Equal(other posting.List) bool { + if p.Len() != other.Len() { + return false + } + + iter := p.Iterator() + otherIter := other.Iterator() + + for iter.Next() { + if !otherIter.Next() { + return false + } + if iter.Current() != otherIter.Current() { + return false + } + } + + return true +} + +func (p *postingsList) Insert(id common.ChunkID) { + p.bitmap.Add(uint64(id)) +} + +func (p *postingsList) Intersect(other posting.List) error { + o, ok := other.(*postingsList) + if !ok { + return ErrIntersectRoaringOnly + } + p.bitmap.And(o.bitmap) + return nil +} + +func (p *postingsList) Difference(other posting.List) error { + o, ok := other.(*postingsList) + if !ok { + return ErrDifferenceRoaringOnly + } + p.bitmap.AndNot(o.bitmap) + return nil +} + +func (p *postingsList) Union(other posting.List) error { + o, ok := other.(*postingsList) + if !ok { + return ErrUnionRoaringOnly + } + p.bitmap.Or(o.bitmap) + return nil +} + +func (p *postingsList) UnionMany(others []posting.List) error { + for _, other := range others { + err := p.Union(other) + if err != nil { + return err + } + } + return nil +} + +func (p *postingsList) AddIterator(iter posting.Iterator) error { + for iter.Next() { + p.Insert(iter.Current()) + } + return nil +} + +func (p *postingsList) AddRange(min, max common.ChunkID) error { + for i := min; i < max; i++ { + p.bitmap.Add(uint64(i)) + } + return nil +} + +func (p *postingsList) RemoveRange(min, max common.ChunkID) error { + for i := min; i < max; i++ { + p.bitmap.Remove(uint64(i)) + } + return nil +} + +func (p *postingsList) Reset() { + p.bitmap.Clear() +} + +type roaringIterator struct { + iter roaring64.IntIterable64 + current common.ChunkID + closed bool +} + +func (it *roaringIterator) Current() common.ChunkID { + return it.current +} + +func (it *roaringIterator) Next() bool { + if it.closed || !it.iter.HasNext() { + return false + } + v := it.iter.Next() + it.current = common.ChunkID(v) + return true +} + +func (it *roaringIterator) Close() error { + it.closed = true + return nil +} diff --git a/pkg/query/logical/common_test.go b/pkg/query/logical/common_test.go index 36c4e26..a455e98 100644 --- a/pkg/query/logical/common_test.go +++ b/pkg/query/logical/common_test.go @@ -35,6 +35,7 @@ import ( "github.com/apache/skywalking-banyandb/banyand/index" "github.com/apache/skywalking-banyandb/banyand/series" "github.com/apache/skywalking-banyandb/pkg/fb" + "github.com/apache/skywalking-banyandb/pkg/posting" "github.com/apache/skywalking-banyandb/pkg/query/executor" executor2 "github.com/apache/skywalking-banyandb/pkg/query/executor" "github.com/apache/skywalking-banyandb/pkg/query/logical" @@ -76,25 +77,24 @@ func GeneratorFromRange(l, r common.ChunkID) ChunkIDGenerator { var _ ChunkIDGenerator = (*arrayChunkIDGenerator)(nil) type arrayChunkIDGenerator struct { - chunkIDs []common.ChunkID - ptr int + hasNext bool + iter posting.Iterator } func (a *arrayChunkIDGenerator) Next() common.ChunkID { - defer func() { - a.ptr++ - }() - return a.chunkIDs[a.ptr] + a.hasNext = a.iter.Next() + return a.iter.Current() } func (a *arrayChunkIDGenerator) HasNext() bool { - return a.ptr < len(a.chunkIDs) + return a.hasNext } -func GeneratorFromArray(chunkIDs []common.ChunkID) ChunkIDGenerator { +func GeneratorFromArray(chunkIDs posting.List) ChunkIDGenerator { + iter := chunkIDs.Iterator() return &arrayChunkIDGenerator{ - chunkIDs: chunkIDs, - ptr: 0, + iter: iter, + hasNext: iter.Next(), } } @@ -152,18 +152,19 @@ func (f *mockDataFactory) MockTraceIDFetch(traceID string) executor2.ExecutionCo return ec } +//TODO: pass correct shardID func (f *mockDataFactory) MockIndexScan(startTime, endTime time.Time, indexMatches ...*indexMatcher) executor2.ExecutionContext { ec := executor.NewMockExecutionContext(f.ctrl) for _, im := range indexMatches { ec. EXPECT(). - Search(*f.traceMetadata, uint64(startTime.UnixNano()), uint64(endTime.UnixNano()), im). + Search(*f.traceMetadata, uint(0), uint64(startTime.UnixNano()), uint64(endTime.UnixNano()), im). Return(im.chunkIDs, nil) } ec. EXPECT(). - FetchEntity(*f.traceMetadata, gomock.Any(), gomock.Any()). - DoAndReturn(func(_ common.Metadata, chunkIDs []common.ChunkID, _ series.ScanOptions) ([]data.Entity, error) { + FetchEntity(*f.traceMetadata, uint(0), gomock.Any(), gomock.Any()). + DoAndReturn(func(_ common.Metadata, _ uint, chunkIDs posting.List, _ series.ScanOptions) ([]data.Entity, error) { return GenerateEntities(GeneratorFromArray(chunkIDs)), nil }) return ec @@ -196,7 +197,7 @@ var _ gomock.Matcher = (*indexMatcher)(nil) type indexMatcher struct { key string - chunkIDs []common.ChunkID + chunkIDs posting.List } func (i *indexMatcher) Matches(x interface{}) bool { @@ -214,7 +215,7 @@ func (i *indexMatcher) String() string { return fmt.Sprintf("is search for key %s", i.key) } -func NewIndexMatcher(key string, chunkIDs []common.ChunkID) *indexMatcher { +func NewIndexMatcher(key string, chunkIDs posting.List) *indexMatcher { return &indexMatcher{ key: key, chunkIDs: chunkIDs, diff --git a/pkg/query/logical/plan_execution_test.go b/pkg/query/logical/plan_execution_test.go index 6c8682f..1aaade8 100644 --- a/pkg/query/logical/plan_execution_test.go +++ b/pkg/query/logical/plan_execution_test.go @@ -24,9 +24,9 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" - "github.com/apache/skywalking-banyandb/api/common" apiv1 "github.com/apache/skywalking-banyandb/api/fbs/v1" "github.com/apache/skywalking-banyandb/banyand/series" + "github.com/apache/skywalking-banyandb/pkg/posting/roaring" "github.com/apache/skywalking-banyandb/pkg/query/executor" logical2 "github.com/apache/skywalking-banyandb/pkg/query/logical" ) @@ -156,7 +156,7 @@ func TestPlanExecution_IndexScan(t *testing.T) { unresolvedPlan: logical2.IndexScan(uint64(st.UnixNano()), uint64(et.UnixNano()), m, []logical2.Expr{ logical2.Eq(logical2.NewFieldRef("http.method"), logical2.Str("GET")), }, series.TraceStateDefault), - indexMatchers: []*indexMatcher{NewIndexMatcher("http.method", []common.ChunkID{1, 2, 3, 4, 5, 6, 7, 8, 9, 10})}, + indexMatchers: []*indexMatcher{NewIndexMatcher("http.method", roaring.NewPostingListWithInitialData(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))}, wantLength: 10, }, { @@ -166,8 +166,8 @@ func TestPlanExecution_IndexScan(t *testing.T) { logical2.Eq(logical2.NewFieldRef("status_code"), logical2.Str("200")), }, series.TraceStateDefault), indexMatchers: []*indexMatcher{ - NewIndexMatcher("http.method", []common.ChunkID{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}), - NewIndexMatcher("status_code", []common.ChunkID{1, 3, 5, 7, 9}), + NewIndexMatcher("http.method", roaring.NewPostingListWithInitialData(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)), + NewIndexMatcher("status_code", roaring.NewPostingListWithInitialData(1, 3, 5, 7, 9)), }, wantLength: 5, }, @@ -178,8 +178,8 @@ func TestPlanExecution_IndexScan(t *testing.T) { logical2.Eq(logical2.NewFieldRef("status_code"), logical2.Str("200")), }, series.TraceStateDefault), indexMatchers: []*indexMatcher{ - NewIndexMatcher("http.method", []common.ChunkID{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}), - NewIndexMatcher("status_code", []common.ChunkID{}), + NewIndexMatcher("http.method", roaring.NewPostingListWithInitialData(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)), + NewIndexMatcher("status_code", roaring.NewPostingList()), }, wantLength: 0, }, diff --git a/pkg/query/logical/plan_indexscan.go b/pkg/query/logical/plan_indexscan.go index b64c885..484edab 100644 --- a/pkg/query/logical/plan_indexscan.go +++ b/pkg/query/logical/plan_indexscan.go @@ -29,6 +29,7 @@ import ( apiv1 "github.com/apache/skywalking-banyandb/api/fbs/v1" "github.com/apache/skywalking-banyandb/banyand/index" "github.com/apache/skywalking-banyandb/banyand/series" + "github.com/apache/skywalking-banyandb/pkg/posting" executor2 "github.com/apache/skywalking-banyandb/pkg/query/executor" ) @@ -107,12 +108,13 @@ type indexScan struct { } func (i *indexScan) Execute(ec executor2.ExecutionContext) ([]data.Entity, error) { - var chunkSet common.ChunkIDs + var chunkSet posting.List for _, exprs := range i.conditionMap { // TODO: Discuss which metadata should be used! // 1) traceSeries Metadata: indirect mapping // 2) indexRule Metadata: cannot uniquely determine traceSeries - chunks, err := ec.Search(*i.traceMetadata, i.startTime, i.endTime, convertToConditions(exprs)) + // TODO: should pass correct shardID + chunks, err := ec.Search(*i.traceMetadata, 0, i.startTime, i.endTime, convertToConditions(exprs)) if err != nil { return nil, err } @@ -121,11 +123,12 @@ func (i *indexScan) Execute(ec executor2.ExecutionContext) ([]data.Entity, error chunkSet = chunks } else { // afterwards, it must not be nil - chunkSet = chunkSet.HashIntersect(chunks) + _ = chunkSet.Intersect(chunks) } } - return ec.FetchEntity(*i.traceMetadata, chunkSet, series.ScanOptions{ + //TODO: pass correct shardID + return ec.FetchEntity(*i.traceMetadata, 0, chunkSet, series.ScanOptions{ Projection: i.projectionFields, State: i.traceState, })
