This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch index-posting
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 1400dd16ddbb511245cf19f475f2bab8fd5ed6ac
Author: Gao Hongtao <[email protected]>
AuthorDate: Tue Jul 20 12:58:34 2021 +0800

    Introduce posting list
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 api/common/id.go                                 |  32 ----
 banyand/index/index.go                           |   3 +-
 banyand/series/idgen.go                          |  20 +--
 banyand/series/series.go                         |   3 +-
 banyand/series/trace/common_test.go              |  14 +-
 banyand/series/trace/query.go                    |  75 ++++++---
 banyand/series/trace/query_test.go               |  39 +++--
 banyand/series/trace/schema_test.go              |   2 +-
 banyand/series/trace/trace.go                    |   5 +-
 banyand/series/trace/write.go                    |   5 +-
 go.mod                                           |   1 +
 go.sum                                           |   8 +
 pkg/convert/number.go                            |  10 ++
 banyand/index/index.go => pkg/posting/posting.go |  64 +++++---
 pkg/posting/roaring/roaring.go                   | 194 +++++++++++++++++++++++
 pkg/query/logical/common_test.go                 |  31 ++--
 pkg/query/logical/plan_execution_test.go         |  12 +-
 pkg/query/logical/plan_indexscan.go              |  11 +-
 18 files changed, 386 insertions(+), 143 deletions(-)

diff --git a/api/common/id.go b/api/common/id.go
index b06573a..443ca83 100644
--- a/api/common/id.go
+++ b/api/common/id.go
@@ -23,35 +23,3 @@ type SeriesID uint64
 const (
        DataBinaryFieldName = "data_binary"
 )
-
-type ChunkIDs []ChunkID
-
-// HashIntersect returns an intersection of two ChunkID arrays
-// without any assumptions on the order. It uses a HashMap to mark
-// the existence of a item.
-func (c ChunkIDs) HashIntersect(other ChunkIDs) ChunkIDs {
-       if len(c) == 0 || len(other) == 0 {
-               return []ChunkID{}
-       }
-       smaller, larger, minLen := min(c, other)
-       intersection := make([]ChunkID, 0, minLen)
-       hash := make(map[ChunkID]struct{})
-       for _, item := range smaller {
-               hash[item] = struct{}{}
-       }
-       for _, item := range larger {
-               if _, exist := hash[item]; exist {
-                       intersection = append(intersection, item)
-               }
-       }
-       return intersection
-}
-
-func min(a, b ChunkIDs) (ChunkIDs, ChunkIDs, int) {
-       aLen := len(a)
-       bLen := len(b)
-       if aLen < bLen {
-               return a, b, aLen
-       }
-       return b, a, bLen
-}
diff --git a/banyand/index/index.go b/banyand/index/index.go
index f8527c8..3c913ce 100644
--- a/banyand/index/index.go
+++ b/banyand/index/index.go
@@ -25,6 +25,7 @@ import (
        apiv1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
        "github.com/apache/skywalking-banyandb/banyand/discovery"
        "github.com/apache/skywalking-banyandb/banyand/queue"
+       posting "github.com/apache/skywalking-banyandb/pkg/posting"
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
@@ -35,7 +36,7 @@ type Condition struct {
 }
 
 type Repo interface {
-       Search(index common.Metadata, startTime, endTime uint64, conditions 
[]Condition) ([]common.ChunkID, error)
+       Search(index common.Metadata, shardID uint, startTime, endTime uint64, 
conditions []Condition) (posting.List, error)
 }
 
 type Builder interface {
diff --git a/banyand/series/idgen.go b/banyand/series/idgen.go
index e4d217a..9394c9d 100644
--- a/banyand/series/idgen.go
+++ b/banyand/series/idgen.go
@@ -21,16 +21,10 @@ import (
        "time"
 )
 
-const (
-       timeStampLen = 41 + 1
-       shardIDLen   = 22
-)
-
 var startTime = uint64(time.Date(2021, 07, 01, 23, 0, 0, 0, 
time.UTC).UTC().UnixNano() / 1e6)
 
 type IDGen interface {
-       Next(shard uint, ts uint64) uint64
-       ParseShardID(ID uint64) (uint, error)
+       Next(ts uint64) uint64
        ParseTS(ID uint64) (uint64, error)
 }
 
@@ -43,17 +37,11 @@ var _ IDGen = (*localID)(nil)
 type localID struct {
 }
 
-func (l *localID) Next(shard uint, ts uint64) uint64 {
+func (l *localID) Next(ts uint64) uint64 {
        df := ts/1e6 - startTime
-       id := (df << shardIDLen) | uint64(shard)
-       return id
-}
-
-func (l *localID) ParseShardID(ID uint64) (uint, error) {
-       shardID := (ID << timeStampLen) >> timeStampLen
-       return uint(shardID), nil
+       return df
 }
 
 func (l *localID) ParseTS(ID uint64) (uint64, error) {
-       return (ID>>shardIDLen + startTime) * 1e6, nil
+       return (ID + startTime) * 1e6, nil
 }
diff --git a/banyand/series/series.go b/banyand/series/series.go
index db280f3..73a9e05 100644
--- a/banyand/series/series.go
+++ b/banyand/series/series.go
@@ -25,6 +25,7 @@ import (
        "github.com/apache/skywalking-banyandb/api/data"
        v1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
        "github.com/apache/skywalking-banyandb/banyand/series/schema"
+       posting2 "github.com/apache/skywalking-banyandb/pkg/posting"
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
@@ -50,7 +51,7 @@ type TraceRepo interface {
        //FetchTrace returns data.Trace by traceID
        FetchTrace(traceSeries common.Metadata, traceID string, opt 
ScanOptions) (data.Trace, error)
        //FetchEntity returns data.Entity by ChunkID
-       FetchEntity(traceSeries common.Metadata, chunkIDs []common.ChunkID, opt 
ScanOptions) ([]data.Entity, error)
+       FetchEntity(traceSeries common.Metadata, shardID uint, chunkIDs 
posting2.List, opt ScanOptions) ([]data.Entity, error)
        //ScanEntity returns data.Entity between a duration by ScanOptions
        ScanEntity(traceSeries common.Metadata, startTime, endTime uint64, opt 
ScanOptions) ([]data.Entity, error)
 }
diff --git a/banyand/series/trace/common_test.go 
b/banyand/series/trace/common_test.go
index fd0769a..acbc986 100644
--- a/banyand/series/trace/common_test.go
+++ b/banyand/series/trace/common_test.go
@@ -241,8 +241,8 @@ func getEntityWithTS(id string, binary []byte, ts uint64, 
items ...interface{})
        }
 }
 
-func setUpTestData(t *testing.T, ts *traceSeries, seriesEntities 
[]seriesEntity) (chunkIDs []common.ChunkID) {
-       chunkIDs = make([]common.ChunkID, 0, len(seriesEntities))
+func setUpTestData(t *testing.T, ts *traceSeries, seriesEntities 
[]seriesEntity) (results []idWithShard) {
+       results = make([]idWithShard, 0, len(seriesEntities))
        for _, se := range seriesEntities {
                seriesID := []byte(se.seriesID)
                b := fb.NewWriteEntityBuilder()
@@ -255,7 +255,8 @@ func setUpTestData(t *testing.T, ts *traceSeries, 
seriesEntities []seriesEntity)
                )
                assert.NoError(t, err)
                we := v1.GetRootAsWriteEntity(builder.FinishedBytes(), 0)
-               got, err := ts.Write(common.SeriesID(convert.Hash(seriesID)), 
partition.ShardID(seriesID, 2), data.EntityValue{
+               shardID := partition.ShardID(seriesID, 2)
+               got, err := ts.Write(common.SeriesID(convert.Hash(seriesID)), 
shardID, data.EntityValue{
                        EntityValue: we.Entity(nil),
                })
                if err != nil {
@@ -264,7 +265,10 @@ func setUpTestData(t *testing.T, ts *traceSeries, 
seriesEntities []seriesEntity)
                if got < 1 {
                        t.Error("Write() got empty chunkID")
                }
-               chunkIDs = append(chunkIDs, got)
+               results = append(results, idWithShard{
+                       id:      got,
+                       shardID: shardID,
+               })
        }
-       return chunkIDs
+       return results
 }
diff --git a/banyand/series/trace/query.go b/banyand/series/trace/query.go
index 4c2af23..7eb5bef 100644
--- a/banyand/series/trace/query.go
+++ b/banyand/series/trace/query.go
@@ -33,6 +33,8 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/fb"
        "github.com/apache/skywalking-banyandb/pkg/partition"
+       "github.com/apache/skywalking-banyandb/pkg/posting"
+       "github.com/apache/skywalking-banyandb/pkg/posting/roaring"
 )
 
 func (t *traceSeries) FetchTrace(traceID string, opt series.ScanOptions) 
(trace data.Trace, err error) {
@@ -52,13 +54,22 @@ func (t *traceSeries) FetchTrace(traceID string, opt 
series.ScanOptions) (trace
        if len(bb) < 1 {
                return trace, nil
        }
-       chunkIDs := make([]common.ChunkID, len(bb))
-       for i, b := range bb {
-               chunkIDs[i] = common.ChunkID(convert.BytesToUint64(b))
+       dataMap := make(map[uint]posting.List)
+       for _, b := range bb {
+               id := idWithShard{
+                       id:      common.ChunkID(convert.BytesToUint64(b[2:])),
+                       shardID: uint(convert.BytesToUint16(b[:2])),
+               }
+               placeID(dataMap, id)
        }
-       entities, errEntity := t.FetchEntity(chunkIDs, opt)
-       if errEntity != nil {
-               return trace, errEntity
+       var entities []data.Entity
+       for s, c := range dataMap {
+               ee, errEntity := t.FetchEntity(c, s, opt)
+               if errEntity != nil {
+                       err = multierr.Append(err, errEntity)
+                       continue
+               }
+               entities = append(entities, ee...)
        }
        return data.Trace{
                KindVersion: data.TraceKindVersion,
@@ -88,13 +99,14 @@ func (t *traceSeries) ScanEntity(startTime, endTime uint64, 
opt series.ScanOptio
                copy(key[1:], startTimeBytes)
                seekKeys = append(seekKeys, key)
        }
-       chunkIDs := make([]common.ChunkID, 0, total)
+       entities := make([]data.Entity, 0, total)
        var num uint32
        opts := kv.DefaultScanOpts
        opts.PrefetchValues = false
        opts.PrefetchSize = int(total)
        var errAll error
        for i := uint(0); i < t.shardNum; i++ {
+               chunkIDs := roaring.NewPostingList()
                for _, seekKey := range seekKeys {
                        state := seekKey[0]
                        err := t.reader.Reader(i, startTimeIndex, startTime, 
endTime).Scan(
@@ -113,8 +125,7 @@ func (t *traceSeries) ScanEntity(startTime, endTime uint64, 
opt series.ScanOptio
                                        }
                                        chunk := make([]byte, len(key)-8-1)
                                        copy(chunk, key[8+1:])
-                                       chunkID := 
common.ChunkID(convert.BytesToUint64(chunk))
-                                       chunkIDs = append(chunkIDs, chunkID)
+                                       
chunkIDs.Insert(common.ChunkID(convert.BytesToUint64(chunk)))
                                        num++
                                        if num > total {
                                                return kv.ErrStopScan
@@ -125,23 +136,25 @@ func (t *traceSeries) ScanEntity(startTime, endTime 
uint64, opt series.ScanOptio
                                errAll = multierr.Append(errAll, err)
                        }
                }
-       }
-       if len(chunkIDs) < 1 {
-               return nil, errAll
-       }
-       entities, err := t.FetchEntity(chunkIDs, opt)
-       if err != nil {
-               errAll = multierr.Append(errAll, err)
+               if chunkIDs.IsEmpty() {
+                       continue
+               }
+               ee, err := t.FetchEntity(chunkIDs, i, opt)
+               if err != nil {
+                       errAll = multierr.Append(errAll, err)
+                       continue
+               }
+               entities = append(entities, ee...)
        }
        return entities, errAll
 }
 
-func (t *traceSeries) FetchEntity(chunkIDs []common.ChunkID, opt 
series.ScanOptions) (entities []data.Entity, err error) {
-       chunkIDsLen := len(chunkIDs)
+func (t *traceSeries) FetchEntity(chunkIDs posting.List, shardID uint, opt 
series.ScanOptions) (entities []data.Entity, err error) {
+       chunkIDsLen := chunkIDs.Len()
        if chunkIDsLen < 1 {
                return nil, ErrChunkIDsEmpty
        }
-       entities = make([]data.Entity, 0, len(chunkIDs))
+       entities = make([]data.Entity, 0, chunkIDsLen)
        fetchDataBinary, fetchFieldsIndices, errInfo := t.parseFetchInfo(opt)
        if errInfo != nil {
                return nil, errInfo
@@ -149,12 +162,10 @@ func (t *traceSeries) FetchEntity(chunkIDs 
[]common.ChunkID, opt series.ScanOpti
        if !fetchDataBinary && len(fetchFieldsIndices) < 1 {
                return nil, ErrProjectionEmpty
        }
-       for _, id := range chunkIDs {
+
+       for iter := chunkIDs.Iterator(); iter.Next(); {
+               id := iter.Current()
                chunkID := uint64(id)
-               shardID, errParseID := t.idGen.ParseShardID(chunkID)
-               if errParseID != nil {
-                       err = multierr.Append(err, errParseID)
-               }
                ts, errParseTS := t.idGen.ParseTS(chunkID)
                if errParseTS != nil {
                        err = multierr.Append(err, errParseTS)
@@ -252,3 +263,19 @@ func (t *traceSeries) getEntityByInternalRef(seriesID 
[]byte, state State, fetch
                Entity: v1.GetRootAsEntity(b.FinishedBytes(), 0),
        }, nil
 }
+
+type idWithShard struct {
+       id      common.ChunkID
+       shardID uint
+}
+
+func placeID(chunkIDCriteria map[uint]posting.List, data idWithShard) {
+       list, ok := chunkIDCriteria[data.shardID]
+       if ok {
+               list.Insert(data.id)
+               return
+       }
+       list = roaring.NewPostingList()
+       list.Insert(data.id)
+       chunkIDCriteria[data.shardID] = list
+}
diff --git a/banyand/series/trace/query_test.go 
b/banyand/series/trace/query_test.go
index 49530db..841c670 100644
--- a/banyand/series/trace/query_test.go
+++ b/banyand/series/trace/query_test.go
@@ -24,15 +24,17 @@ import (
        "time"
 
        "github.com/stretchr/testify/assert"
+       "go.uber.org/multierr"
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/banyand/series"
+       "github.com/apache/skywalking-banyandb/pkg/posting"
 )
 
 func Test_traceSeries_FetchEntity(t *testing.T) {
        type args struct {
                chunkIDIndices []int
-               chunkIDs       common.ChunkIDs
+               chunkIDs       []idWithShard
                opt            series.ScanOptions
        }
        tests := []struct {
@@ -92,15 +94,23 @@ func Test_traceSeries_FetchEntity(t *testing.T) {
                {
                        name: "invalid chunk ids",
                        args: args{
-                               chunkIDs: common.ChunkIDs{0},
-                               opt:      series.ScanOptions{Projection: 
[]string{"trace_id", "data_binary"}},
+                               chunkIDs: []idWithShard{
+                                       {
+                                               id: common.ChunkID(0),
+                                       },
+                               },
+                               opt: series.ScanOptions{Projection: 
[]string{"trace_id", "data_binary"}},
                        },
                        wantErr: true,
                },
                {
                        name: "mix up invalid/valid ids",
                        args: args{
-                               chunkIDs:       common.ChunkIDs{0},
+                               chunkIDs: []idWithShard{
+                                       {
+                                               id: common.ChunkID(0),
+                                       },
+                               },
                                chunkIDIndices: []int{0, 1},
                                opt:            series.ScanOptions{Projection: 
[]string{"trace_id", "data_binary"}},
                        },
@@ -128,20 +138,29 @@ func Test_traceSeries_FetchEntity(t *testing.T) {
        }
        ts, stopFunc := setup(t)
        defer stopFunc()
-       chunkIDs := setUpTestData(t, ts, 
testData(uint64(time.Now().UnixNano())))
+       dataResult := setUpTestData(t, ts, 
testData(uint64(time.Now().UnixNano())))
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
-                       chunkIDCriteria := make([]common.ChunkID, 0, 
len(tt.args.chunkIDIndices))
+                       chunkIDCriteria := make(map[uint]posting.List, 2)
                        for i := range tt.args.chunkIDIndices {
-                               chunkIDCriteria = append(chunkIDCriteria, 
chunkIDs[i])
+                               placeID(chunkIDCriteria, dataResult[i])
                        }
                        for _, id := range tt.args.chunkIDs {
-                               chunkIDCriteria = append(chunkIDCriteria, id)
+                               placeID(chunkIDCriteria, id)
+                       }
+                       var entities ByEntityID
+                       var err error
+                       for s, c := range chunkIDCriteria {
+                               ee, errFetch := ts.FetchEntity(c, s, 
tt.args.opt)
+                               if errFetch != nil {
+                                       err = multierr.Append(err, errFetch)
+                               }
+                               entities = append(entities, ee...)
                        }
-                       entities, err := ts.FetchEntity(chunkIDCriteria, 
tt.args.opt)
                        if (err != nil) != tt.wantErr {
                                t.Errorf("Write() error = %v, wantErr %v", err, 
tt.wantErr)
                        }
+                       sort.Sort(entities)
                        assert.Equal(t, len(tt.wantEntities), len(entities))
                        for i, e := range entities {
                                assert.EqualValues(t, 
tt.wantEntities[i].entityID, e.EntityId())
@@ -266,7 +285,7 @@ func Test_traceSeries_ScanEntity(t *testing.T) {
                {
                        name: "single result",
                        args: args{
-                               start: baseTS,
+                               start: 0,
                                end:   baseTS,
                        },
                        wantEntities: []wantEntity{
diff --git a/banyand/series/trace/schema_test.go 
b/banyand/series/trace/schema_test.go
index 57c0d1b..89b3369 100644
--- a/banyand/series/trace/schema_test.go
+++ b/banyand/series/trace/schema_test.go
@@ -59,7 +59,7 @@ func Test_service_RulesBySubject(t *testing.T) {
                        want: getIndexRule("sw-index-rule", "default"),
                },
                {
-                       name: "got empty result",
+                       name: "got empty idWithShard",
                        args: args{
                                series: createSubject("sw", "default"),
                                filter: func(object v1.IndexObject) bool {
diff --git a/banyand/series/trace/trace.go b/banyand/series/trace/trace.go
index e6aa39d..68b3015 100644
--- a/banyand/series/trace/trace.go
+++ b/banyand/series/trace/trace.go
@@ -39,6 +39,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/fb"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       posting2 "github.com/apache/skywalking-banyandb/pkg/posting"
 )
 
 const (
@@ -162,12 +163,12 @@ func (s *service) FetchTrace(traceSeries common.Metadata, 
traceID string, opt se
        return ts.FetchTrace(traceID, opt)
 }
 
-func (s *service) FetchEntity(traceSeries common.Metadata, chunkIDs 
[]common.ChunkID, opt series.ScanOptions) ([]data.Entity, error) {
+func (s *service) FetchEntity(traceSeries common.Metadata, shardID uint, 
chunkIDs posting2.List, opt series.ScanOptions) ([]data.Entity, error) {
        ts, err := s.getSeries(traceSeries)
        if err != nil {
                return nil, err
        }
-       return ts.FetchEntity(chunkIDs, opt)
+       return ts.FetchEntity(chunkIDs, shardID, opt)
 }
 
 func (s *service) ScanEntity(traceSeries common.Metadata, startTime, endTime 
uint64, opt series.ScanOptions) ([]data.Entity, error) {
diff --git a/banyand/series/trace/write.go b/banyand/series/trace/write.go
index 7b78b7d..5d28af7 100644
--- a/banyand/series/trace/write.go
+++ b/banyand/series/trace/write.go
@@ -44,7 +44,8 @@ func (t *traceSeries) Write(seriesID common.SeriesID, shardID 
uint, entity data.
                return 0, errGetState
        }
        stateBytes := []byte{byte(state)}
-       chunkID := t.idGen.Next(shardID, entity.TimestampNanoseconds())
+       tts := entity.TimestampNanoseconds()
+       chunkID := t.idGen.Next(tts)
        ts, errParseTS := t.idGen.ParseTS(chunkID)
        if errParseTS != nil {
                return 0, errors.Wrap(errParseTS, "failed to parse timestamp 
from chunk id")
@@ -70,7 +71,7 @@ func (t *traceSeries) Write(seriesID common.SeriesID, shardID 
uint, entity data.
                return 0, errors.Wrap(err, "failed to write chunkID index")
        }
        traceIDShardID := partition.ShardID(traceID, t.shardNum)
-       if err = wp.TimeSeriesWriter(traceIDShardID, traceIndex).Put(traceID, 
chunkIDBytes, entity.TimestampNanoseconds()); err != nil {
+       if err = wp.TimeSeriesWriter(traceIDShardID, traceIndex).Put(traceID, 
bydb_bytes.Join(convert.Uint16ToBytes(uint16(shardID)), chunkIDBytes), 
entity.TimestampNanoseconds()); err != nil {
                return 0, errors.Wrap(err, "failed to Trace index")
        }
        err = wp.Writer(shardID, 
startTimeIndex).Put(bydb_bytes.Join(stateBytes, tsBytes, chunkIDBytes), nil)
diff --git a/go.mod b/go.mod
index 2f971df..317ce5a 100644
--- a/go.mod
+++ b/go.mod
@@ -3,6 +3,7 @@ module github.com/apache/skywalking-banyandb
 go 1.16
 
 require (
+       github.com/RoaringBitmap/roaring v0.9.1 // indirect
        github.com/cespare/xxhash v1.1.0
        github.com/dgraph-io/badger/v3 v3.2011.1
        github.com/golang/mock v1.3.1
diff --git a/go.sum b/go.sum
index 24a16ba..a1c449b 100644
--- a/go.sum
+++ b/go.sum
@@ -16,6 +16,10 @@ github.com/BurntSushi/toml v0.3.1/go.mod 
h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod 
h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
 github.com/OneOfOne/xxhash v1.2.2 
h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
 github.com/OneOfOne/xxhash v1.2.2/go.mod 
h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
+github.com/RoaringBitmap/gocroaring v0.4.0/go.mod 
h1:NieMwz7ZqwU2DD73/vvYwv7r4eWBKuPVSXZIpsaMwCI=
+github.com/RoaringBitmap/real-roaring-datasets 
v0.0.0-20190726190000-eb7c87156f76/go.mod 
h1:oM0MHmQ3nDsq609SS36p+oYbRi16+oVvU2Bw4Ipv0SE=
+github.com/RoaringBitmap/roaring v0.9.1 
h1:5PRizBmoN/PfV17nPNQou4dHQ7NcJi8FO/bihdYyCEM=
+github.com/RoaringBitmap/roaring v0.9.1/go.mod 
h1:h1B7iIUOmnAeb5ytYMvnHJwxMc6LUrwBnzXWRuqTQUc=
 github.com/SkyAPM/badger/v3 v3.0.0-20210624023741-bd2dcfcaaa74 
h1:BFHSkDBSYCtPxMgxGz07DfNRYS76KFVDlocQ2U9rY7E=
 github.com/SkyAPM/badger/v3 v3.0.0-20210624023741-bd2dcfcaaa74/go.mod 
h1:XieWaNygSGj5ZzSsZO4tQe/2wwLjCvESus4twFqxOKc=
 github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod 
h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
@@ -27,6 +31,8 @@ github.com/armon/go-radix 
v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj
 github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod 
h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
 github.com/beorn7/perks v1.0.0/go.mod 
h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
 github.com/bgentry/speakeasy v0.1.0/go.mod 
h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
+github.com/bits-and-blooms/bitset v1.2.0 
h1:Kn4yilvwNtMACtf1eYDlG8H77R07mZSPbMjLyS07ChA=
+github.com/bits-and-blooms/bitset v1.2.0/go.mod 
h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA=
 github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod 
h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
 github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod 
h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
 github.com/cespare/xxhash v1.1.0 
h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
@@ -182,6 +188,8 @@ github.com/mitchellh/mapstructure v1.1.2 
h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQz
 github.com/mitchellh/mapstructure v1.1.2/go.mod 
h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod 
h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
 github.com/modern-go/reflect2 v1.0.1/go.mod 
h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
+github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM=
+github.com/mschoch/smat v0.2.0/go.mod 
h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw=
 github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod 
h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
 github.com/oklog/run v1.1.0 h1:GEenZ1cK0+q0+wsJew9qUg/DyD8k3JzYsZAi5gYi2mA=
 github.com/oklog/run v1.1.0/go.mod 
h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU=
diff --git a/pkg/convert/number.go b/pkg/convert/number.go
index 3ad4948..c06d7a4 100644
--- a/pkg/convert/number.go
+++ b/pkg/convert/number.go
@@ -31,6 +31,12 @@ func Int64ToBytes(i int64) []byte {
        return buf
 }
 
+func Uint16ToBytes(u uint16) []byte {
+       bs := make([]byte, 2)
+       binary.BigEndian.PutUint16(bs, u)
+       return bs
+}
+
 func Uint32ToBytes(u uint32) []byte {
        bs := make([]byte, 4)
        binary.BigEndian.PutUint32(bs, u)
@@ -45,6 +51,10 @@ func BytesToUint32(b []byte) uint32 {
        return binary.BigEndian.Uint32(b)
 }
 
+func BytesToUint16(b []byte) uint16 {
+       return binary.BigEndian.Uint16(b)
+}
+
 func IntToInt64(numbers ...int) []int64 {
        var arr []int64
        for i := 0; i < len(numbers); i++ {
diff --git a/banyand/index/index.go b/pkg/posting/posting.go
similarity index 54%
copy from banyand/index/index.go
copy to pkg/posting/posting.go
index f8527c8..3812162 100644
--- a/banyand/index/index.go
+++ b/pkg/posting/posting.go
@@ -15,39 +15,55 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//go:generate mockgen -destination=./index_mock.go -package=index . Repo
-package index
+package posting
 
 import (
-       "context"
+       "github.com/pkg/errors"
 
        "github.com/apache/skywalking-banyandb/api/common"
-       apiv1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
-       "github.com/apache/skywalking-banyandb/banyand/discovery"
-       "github.com/apache/skywalking-banyandb/banyand/queue"
-       "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
-type Condition struct {
-       Key    string
-       Values [][]byte
-       Op     apiv1.BinaryOp
-}
+var ErrListEmpty = errors.New("postings list is empty")
 
-type Repo interface {
-       Search(index common.Metadata, startTime, endTime uint64, conditions 
[]Condition) ([]common.ChunkID, error)
-}
+// List is a collection of common.ChunkID.
+type List interface {
+       Contains(id common.ChunkID) bool
 
-type Builder interface {
-       run.Config
-       run.PreRunner
-}
+       IsEmpty() bool
+
+       Max() (common.ChunkID, error)
+
+       Len() int
+
+       Iterator() Iterator
+
+       Clone() List
+
+       Equal(other List) bool
+
+       Insert(i common.ChunkID)
 
-type Service interface {
-       Repo
-       Builder
+       Intersect(other List) error
+
+       Difference(other List) error
+
+       Union(other List) error
+
+       UnionMany(others []List) error
+
+       AddIterator(iter Iterator) error
+
+       AddRange(min, max common.ChunkID) error
+
+       RemoveRange(min, max common.ChunkID) error
+
+       Reset()
 }
 
-func NewService(ctx context.Context, repo discovery.ServiceRepo, pipeline 
queue.Queue) (Service, error) {
-       return nil, nil
+type Iterator interface {
+       Next() bool
+
+       Current() common.ChunkID
+
+       Close() error
 }
diff --git a/pkg/posting/roaring/roaring.go b/pkg/posting/roaring/roaring.go
new file mode 100644
index 0000000..7c778d0
--- /dev/null
+++ b/pkg/posting/roaring/roaring.go
@@ -0,0 +1,194 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package roaring
+
+import (
+       "github.com/RoaringBitmap/roaring/roaring64"
+       "github.com/pkg/errors"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/pkg/posting"
+)
+
+var (
+       ErrIntersectRoaringOnly  = errors.New("Intersect only supported between 
roaringDocId sets")
+       ErrUnionRoaringOnly      = errors.New("Union only supported between 
roaringDocId sets")
+       ErrDifferenceRoaringOnly = errors.New("Difference only supported 
between roaringDocId sets")
+)
+
+var _ posting.List = (*postingsList)(nil)
+
+// postingsList abstracts a Roaring Bitmap.
+type postingsList struct {
+       bitmap *roaring64.Bitmap
+}
+
+func NewPostingList() posting.List {
+       return &postingsList{
+               bitmap: roaring64.New(),
+       }
+}
+
+func NewPostingListWithInitialData(data ...uint64) posting.List {
+       list := NewPostingList()
+       for _, d := range data {
+               list.Insert(common.ChunkID(d))
+       }
+       return list
+}
+
+func (p *postingsList) Contains(id common.ChunkID) bool {
+       return p.bitmap.Contains(uint64(id))
+}
+
+func (p *postingsList) IsEmpty() bool {
+       return p.bitmap.IsEmpty()
+}
+
+func (p *postingsList) Max() (common.ChunkID, error) {
+       if p.IsEmpty() {
+               return 0, posting.ErrListEmpty
+       }
+       return common.ChunkID(p.bitmap.Maximum()), nil
+}
+
+func (p *postingsList) Len() int {
+       return int(p.bitmap.GetCardinality())
+}
+
+func (p *postingsList) Iterator() posting.Iterator {
+       return &roaringIterator{
+               iter: p.bitmap.Iterator(),
+       }
+}
+
+func (p *postingsList) Clone() posting.List {
+       return &postingsList{
+               bitmap: p.bitmap.Clone(),
+       }
+}
+
+func (p *postingsList) Equal(other posting.List) bool {
+       if p.Len() != other.Len() {
+               return false
+       }
+
+       iter := p.Iterator()
+       otherIter := other.Iterator()
+
+       for iter.Next() {
+               if !otherIter.Next() {
+                       return false
+               }
+               if iter.Current() != otherIter.Current() {
+                       return false
+               }
+       }
+
+       return true
+}
+
+func (p *postingsList) Insert(id common.ChunkID) {
+       p.bitmap.Add(uint64(id))
+}
+
+func (p *postingsList) Intersect(other posting.List) error {
+       o, ok := other.(*postingsList)
+       if !ok {
+               return ErrIntersectRoaringOnly
+       }
+       p.bitmap.And(o.bitmap)
+       return nil
+}
+
+func (p *postingsList) Difference(other posting.List) error {
+       o, ok := other.(*postingsList)
+       if !ok {
+               return ErrDifferenceRoaringOnly
+       }
+       p.bitmap.AndNot(o.bitmap)
+       return nil
+}
+
+func (p *postingsList) Union(other posting.List) error {
+       o, ok := other.(*postingsList)
+       if !ok {
+               return ErrUnionRoaringOnly
+       }
+       p.bitmap.Or(o.bitmap)
+       return nil
+}
+
+func (p *postingsList) UnionMany(others []posting.List) error {
+       for _, other := range others {
+               err := p.Union(other)
+               if err != nil {
+                       return err
+               }
+       }
+       return nil
+}
+
+func (p *postingsList) AddIterator(iter posting.Iterator) error {
+       for iter.Next() {
+               p.Insert(iter.Current())
+       }
+       return nil
+}
+
+func (p *postingsList) AddRange(min, max common.ChunkID) error {
+       for i := min; i < max; i++ {
+               p.bitmap.Add(uint64(i))
+       }
+       return nil
+}
+
+func (p *postingsList) RemoveRange(min, max common.ChunkID) error {
+       for i := min; i < max; i++ {
+               p.bitmap.Remove(uint64(i))
+       }
+       return nil
+}
+
+func (p *postingsList) Reset() {
+       p.bitmap.Clear()
+}
+
+type roaringIterator struct {
+       iter    roaring64.IntIterable64
+       current common.ChunkID
+       closed  bool
+}
+
+func (it *roaringIterator) Current() common.ChunkID {
+       return it.current
+}
+
+func (it *roaringIterator) Next() bool {
+       if it.closed || !it.iter.HasNext() {
+               return false
+       }
+       v := it.iter.Next()
+       it.current = common.ChunkID(v)
+       return true
+}
+
+func (it *roaringIterator) Close() error {
+       it.closed = true
+       return nil
+}
diff --git a/pkg/query/logical/common_test.go b/pkg/query/logical/common_test.go
index 36c4e26..a455e98 100644
--- a/pkg/query/logical/common_test.go
+++ b/pkg/query/logical/common_test.go
@@ -35,6 +35,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/index"
        "github.com/apache/skywalking-banyandb/banyand/series"
        "github.com/apache/skywalking-banyandb/pkg/fb"
+       "github.com/apache/skywalking-banyandb/pkg/posting"
        "github.com/apache/skywalking-banyandb/pkg/query/executor"
        executor2 "github.com/apache/skywalking-banyandb/pkg/query/executor"
        "github.com/apache/skywalking-banyandb/pkg/query/logical"
@@ -76,25 +77,24 @@ func GeneratorFromRange(l, r common.ChunkID) 
ChunkIDGenerator {
 var _ ChunkIDGenerator = (*arrayChunkIDGenerator)(nil)
 
 type arrayChunkIDGenerator struct {
-       chunkIDs []common.ChunkID
-       ptr      int
+       hasNext bool
+       iter    posting.Iterator
 }
 
 func (a *arrayChunkIDGenerator) Next() common.ChunkID {
-       defer func() {
-               a.ptr++
-       }()
-       return a.chunkIDs[a.ptr]
+       a.hasNext = a.iter.Next()
+       return a.iter.Current()
 }
 
 func (a *arrayChunkIDGenerator) HasNext() bool {
-       return a.ptr < len(a.chunkIDs)
+       return a.hasNext
 }
 
-func GeneratorFromArray(chunkIDs []common.ChunkID) ChunkIDGenerator {
+func GeneratorFromArray(chunkIDs posting.List) ChunkIDGenerator {
+       iter := chunkIDs.Iterator()
        return &arrayChunkIDGenerator{
-               chunkIDs: chunkIDs,
-               ptr:      0,
+               iter:    iter,
+               hasNext: iter.Next(),
        }
 }
 
@@ -152,18 +152,19 @@ func (f *mockDataFactory) MockTraceIDFetch(traceID 
string) executor2.ExecutionCo
        return ec
 }
 
+//TODO: pass correct shardID
 func (f *mockDataFactory) MockIndexScan(startTime, endTime time.Time, 
indexMatches ...*indexMatcher) executor2.ExecutionContext {
        ec := executor.NewMockExecutionContext(f.ctrl)
        for _, im := range indexMatches {
                ec.
                        EXPECT().
-                       Search(*f.traceMetadata, uint64(startTime.UnixNano()), 
uint64(endTime.UnixNano()), im).
+                       Search(*f.traceMetadata, uint(0), 
uint64(startTime.UnixNano()), uint64(endTime.UnixNano()), im).
                        Return(im.chunkIDs, nil)
        }
        ec.
                EXPECT().
-               FetchEntity(*f.traceMetadata, gomock.Any(), gomock.Any()).
-               DoAndReturn(func(_ common.Metadata, chunkIDs []common.ChunkID, 
_ series.ScanOptions) ([]data.Entity, error) {
+               FetchEntity(*f.traceMetadata, uint(0), gomock.Any(), 
gomock.Any()).
+               DoAndReturn(func(_ common.Metadata, _ uint, chunkIDs 
posting.List, _ series.ScanOptions) ([]data.Entity, error) {
                        return GenerateEntities(GeneratorFromArray(chunkIDs)), 
nil
                })
        return ec
@@ -196,7 +197,7 @@ var _ gomock.Matcher = (*indexMatcher)(nil)
 
 type indexMatcher struct {
        key      string
-       chunkIDs []common.ChunkID
+       chunkIDs posting.List
 }
 
 func (i *indexMatcher) Matches(x interface{}) bool {
@@ -214,7 +215,7 @@ func (i *indexMatcher) String() string {
        return fmt.Sprintf("is search for key %s", i.key)
 }
 
-func NewIndexMatcher(key string, chunkIDs []common.ChunkID) *indexMatcher {
+func NewIndexMatcher(key string, chunkIDs posting.List) *indexMatcher {
        return &indexMatcher{
                key:      key,
                chunkIDs: chunkIDs,
diff --git a/pkg/query/logical/plan_execution_test.go 
b/pkg/query/logical/plan_execution_test.go
index 6c8682f..1aaade8 100644
--- a/pkg/query/logical/plan_execution_test.go
+++ b/pkg/query/logical/plan_execution_test.go
@@ -24,9 +24,9 @@ import (
        "github.com/golang/mock/gomock"
        "github.com/stretchr/testify/require"
 
-       "github.com/apache/skywalking-banyandb/api/common"
        apiv1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
        "github.com/apache/skywalking-banyandb/banyand/series"
+       "github.com/apache/skywalking-banyandb/pkg/posting/roaring"
        "github.com/apache/skywalking-banyandb/pkg/query/executor"
        logical2 "github.com/apache/skywalking-banyandb/pkg/query/logical"
 )
@@ -156,7 +156,7 @@ func TestPlanExecution_IndexScan(t *testing.T) {
                        unresolvedPlan: 
logical2.IndexScan(uint64(st.UnixNano()), uint64(et.UnixNano()), m, 
[]logical2.Expr{
                                
logical2.Eq(logical2.NewFieldRef("http.method"), logical2.Str("GET")),
                        }, series.TraceStateDefault),
-                       indexMatchers: 
[]*indexMatcher{NewIndexMatcher("http.method", []common.ChunkID{1, 2, 3, 4, 5, 
6, 7, 8, 9, 10})},
+                       indexMatchers: 
[]*indexMatcher{NewIndexMatcher("http.method", 
roaring.NewPostingListWithInitialData(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))},
                        wantLength:    10,
                },
                {
@@ -166,8 +166,8 @@ func TestPlanExecution_IndexScan(t *testing.T) {
                                
logical2.Eq(logical2.NewFieldRef("status_code"), logical2.Str("200")),
                        }, series.TraceStateDefault),
                        indexMatchers: []*indexMatcher{
-                               NewIndexMatcher("http.method", 
[]common.ChunkID{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}),
-                               NewIndexMatcher("status_code", 
[]common.ChunkID{1, 3, 5, 7, 9}),
+                               NewIndexMatcher("http.method", 
roaring.NewPostingListWithInitialData(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)),
+                               NewIndexMatcher("status_code", 
roaring.NewPostingListWithInitialData(1, 3, 5, 7, 9)),
                        },
                        wantLength: 5,
                },
@@ -178,8 +178,8 @@ func TestPlanExecution_IndexScan(t *testing.T) {
                                
logical2.Eq(logical2.NewFieldRef("status_code"), logical2.Str("200")),
                        }, series.TraceStateDefault),
                        indexMatchers: []*indexMatcher{
-                               NewIndexMatcher("http.method", 
[]common.ChunkID{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}),
-                               NewIndexMatcher("status_code", 
[]common.ChunkID{}),
+                               NewIndexMatcher("http.method", 
roaring.NewPostingListWithInitialData(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)),
+                               NewIndexMatcher("status_code", 
roaring.NewPostingList()),
                        },
                        wantLength: 0,
                },
diff --git a/pkg/query/logical/plan_indexscan.go 
b/pkg/query/logical/plan_indexscan.go
index b64c885..484edab 100644
--- a/pkg/query/logical/plan_indexscan.go
+++ b/pkg/query/logical/plan_indexscan.go
@@ -29,6 +29,7 @@ import (
        apiv1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
        "github.com/apache/skywalking-banyandb/banyand/index"
        "github.com/apache/skywalking-banyandb/banyand/series"
+       "github.com/apache/skywalking-banyandb/pkg/posting"
        executor2 "github.com/apache/skywalking-banyandb/pkg/query/executor"
 )
 
@@ -107,12 +108,13 @@ type indexScan struct {
 }
 
 func (i *indexScan) Execute(ec executor2.ExecutionContext) ([]data.Entity, 
error) {
-       var chunkSet common.ChunkIDs
+       var chunkSet posting.List
        for _, exprs := range i.conditionMap {
                // TODO: Discuss which metadata should be used!
                // 1) traceSeries Metadata: indirect mapping
                // 2) indexRule Metadata: cannot uniquely determine traceSeries
-               chunks, err := ec.Search(*i.traceMetadata, i.startTime, 
i.endTime, convertToConditions(exprs))
+               // TODO: should pass correct shardID
+               chunks, err := ec.Search(*i.traceMetadata, 0, i.startTime, 
i.endTime, convertToConditions(exprs))
                if err != nil {
                        return nil, err
                }
@@ -121,11 +123,12 @@ func (i *indexScan) Execute(ec 
executor2.ExecutionContext) ([]data.Entity, error
                        chunkSet = chunks
                } else {
                        // afterwards, it must not be nil
-                       chunkSet = chunkSet.HashIntersect(chunks)
+                       _ = chunkSet.Intersect(chunks)
                }
        }
 
-       return ec.FetchEntity(*i.traceMetadata, chunkSet, series.ScanOptions{
+       //TODO: pass correct shardID
+       return ec.FetchEntity(*i.traceMetadata, 0, chunkSet, series.ScanOptions{
                Projection: i.projectionFields,
                State:      i.traceState,
        })

Reply via email to