This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch stream in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit f6057ef76ddcc3d330bc94aeb890f0d04b5bf357 Author: Gao Hongtao <[email protected]> AuthorDate: Fri Jan 17 08:13:25 2025 +0800 Introduce batch scan to stream query by timestamp Signed-off-by: Gao Hongtao <[email protected]> --- banyand/measure/topn.go | 2 +- banyand/stream/block_scanner.go | 212 +++++++++ banyand/stream/index.go | 7 +- banyand/stream/merger_policy.go | 8 - banyand/stream/query.go | 500 +++------------------ banyand/stream/query_by_idx.go | 321 ++++++++++++++ banyand/stream/query_by_ts.go | 265 +++++++++++ banyand/stream/query_test.go | 424 ------------------ banyand/stream/stream.go | 14 +- banyand/stream/trace.go | 2 +- banyand/stream/tstable_test.go | 7 - pkg/pb/v1/value.go | 2 +- pkg/query/logical/optimizer.go | 2 +- pkg/query/model/model.go | 158 +++++++ pkg/query/model/model_test.go | 947 ++++++++++++++++++++++++++++++++++++++++ 15 files changed, 1976 insertions(+), 895 deletions(-) diff --git a/banyand/measure/topn.go b/banyand/measure/topn.go index c2f7c40f..4170aef6 100644 --- a/banyand/measure/topn.go +++ b/banyand/measure/topn.go @@ -676,7 +676,7 @@ func (t *TopNValue) Unmarshal(src []byte, decoder *encoding.BytesBlockDecoder) e for i, ev := range t.entityValues { t.buf, t.entities[i], err = pbv1.UnmarshalTagValues(t.buf, t.entities[i], ev) if err != nil { - return fmt.Errorf("cannot unmarshal topNValue.entityValues[%d]: %w", i, err) + return fmt.Errorf("cannot unmarshal topNValue.entityValues[%d]:%s %w", i, ev, err) } if len(t.entities[i]) != len(t.entityTagNames) { return fmt.Errorf("entityValues[%d] length is not equal to entityTagNames", i) diff --git a/banyand/stream/block_scanner.go b/banyand/stream/block_scanner.go new file mode 100644 index 00000000..dff5ecce --- /dev/null +++ b/banyand/stream/block_scanner.go @@ -0,0 +1,212 @@ +package stream + +import ( + "context" + "fmt" + "sort" + "sync" + + "github.com/apache/skywalking-banyandb/api/common" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/banyand/internal/storage" + "github.com/apache/skywalking-banyandb/pkg/cgroups" + "github.com/apache/skywalking-banyandb/pkg/index/posting" + "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/pool" + logicalstream "github.com/apache/skywalking-banyandb/pkg/query/logical/stream" +) + +const blockScannerBatchSize = 32 + +type blockScanResult struct { + p *part + qo queryOptions + bm blockMetadata +} + +func (bs *blockScanResult) reset() { + bs.p = nil + bs.bm.reset() +} + +type blockScanResultBatch struct { + err error + bss []blockScanResult +} + +func (bsb *blockScanResultBatch) reset() { + for i := range bsb.bss { + bsb.bss[i].reset() + } + bsb.bss = bsb.bss[:0] +} + +func generateBlockScanResultBatch() *blockScanResultBatch { + v := blockScanResultBatchPool.Get() + if v == nil { + return &blockScanResultBatch{ + bss: make([]blockScanResult, 0, blockScannerBatchSize), + } + } + return v +} + +func releaseBlockScanResultBatch(bsb *blockScanResultBatch) { + bsb.reset() + blockScanResultBatchPool.Put(bsb) +} + +var blockScanResultBatchPool = pool.Register[*blockScanResultBatch]("stream-blockScannerBatch") + +var shardScanConcurrencyCh = make(chan struct{}, cgroups.CPUs()) + +type blockScanner struct { + segment storage.Segment[*tsTable, *option] + series []*pbv1.Series + seriesIDs []uint64 + qo queryOptions +} + +func (q *blockScanner) searchSeries(ctx context.Context) error { + seriesFilter := roaring.NewPostingList() + sl, err := q.segment.Lookup(ctx, q.series) + if err != nil { + return err + } + for i := range sl { + if seriesFilter.Contains(uint64(sl[i].ID)) { + continue + } + seriesFilter.Insert(uint64(sl[i].ID)) + if q.qo.seriesToEntity == nil { + q.qo.seriesToEntity = make(map[common.SeriesID][]*modelv1.TagValue) + } + q.qo.seriesToEntity[sl[i].ID] = sl[i].EntityValues + q.qo.sortedSids = append(q.qo.sortedSids, sl[i].ID) + } + if seriesFilter.IsEmpty() { + return nil + } + q.seriesIDs = seriesFilter.ToSlice() + sort.Slice(q.qo.sortedSids, func(i, j int) bool { return q.qo.sortedSids[i] < q.qo.sortedSids[j] }) + return nil +} + +func (q *blockScanner) scanShardsInParallel(ctx context.Context, wg *sync.WaitGroup, blockCh chan *blockScanResultBatch) []scanFinalizer { + tabs := q.segment.Tables() + finalizers := make([]scanFinalizer, len(tabs)) + for i := range tabs { + select { + case shardScanConcurrencyCh <- struct{}{}: + case <-ctx.Done(): + return finalizers + } + wg.Add(1) + go func(idx int, tab *tsTable) { + finalizers[idx] = q.scanBlocks(ctx, q.seriesIDs, tab, blockCh) + wg.Done() + <-shardScanConcurrencyCh + }(i, tabs[i]) + } + return finalizers +} + +func (q *blockScanner) scanBlocks(ctx context.Context, seriesList []uint64, tab *tsTable, blockCh chan *blockScanResultBatch) (sf scanFinalizer) { + s := tab.currentSnapshot() + if s == nil { + return nil + } + sf = s.decRef + filter, err := q.indexSearch(ctx, seriesList, tab) + if err != nil { + select { + case blockCh <- &blockScanResultBatch{err: err}: + case <-ctx.Done(): + } + return + } + select { + case <-ctx.Done(): + return + default: + } + + parts, n := s.getParts(nil, q.qo.minTimestamp, q.qo.maxTimestamp) + if n < 1 { + return + } + bma := generateBlockMetadataArray() + defer releaseBlockMetadataArray(bma) + ti := generateTstIter() + defer releaseTstIter(ti) + ti.init(bma, parts, q.qo.sortedSids, q.qo.minTimestamp, q.qo.maxTimestamp) + batch := generateBlockScanResultBatch() + if ti.Error() != nil { + batch.err = fmt.Errorf("cannot init tstIter: %w", ti.Error()) + select { + case blockCh <- batch: + case <-ctx.Done(): + releaseBlockScanResultBatch(batch) + } + return + } + for ti.nextBlock() { + p := ti.piHeap[0] + batch.bss = append(batch.bss, blockScanResult{ + p: p.p, + }) + bs := &batch.bss[len(batch.bss)-1] + bs.qo.copyFrom(&q.qo) + bs.qo.elementFilter = filter + bs.bm.copyFrom(p.curBlock) + if len(batch.bss) >= cap(batch.bss) { + select { + case blockCh <- batch: + case <-ctx.Done(): + releaseBlockScanResultBatch(batch) + return + } + batch = generateBlockScanResultBatch() + } + } + if ti.Error() != nil { + batch.err = fmt.Errorf("cannot iterate tstIter: %w", ti.Error()) + select { + case blockCh <- batch: + case <-ctx.Done(): + releaseBlockScanResultBatch(batch) + } + return + } + if len(batch.bss) > 0 { + select { + case blockCh <- batch: + case <-ctx.Done(): + releaseBlockScanResultBatch(batch) + } + return + } + releaseBlockScanResultBatch(batch) + return +} + +func (q *blockScanner) indexSearch(ctx context.Context, seriesList []uint64, tw *tsTable) (posting.List, error) { + if q.qo.Filter == nil || q.qo.Filter == logicalstream.ENode { + return nil, nil + } + pl, err := tw.Index().Search(ctx, seriesList, q.qo.Filter) + if err != nil { + return nil, err + } + if pl == nil { + return roaring.DummyPostingList, nil + } + return pl, nil +} + +func (q *blockScanner) close() { + q.segment.DecRef() +} + +type scanFinalizer func() diff --git a/banyand/stream/index.go b/banyand/stream/index.go index 128308cd..a21f4b62 100644 --- a/banyand/stream/index.go +++ b/banyand/stream/index.go @@ -30,7 +30,6 @@ import ( "github.com/apache/skywalking-banyandb/pkg/index/inverted" "github.com/apache/skywalking-banyandb/pkg/index/posting" "github.com/apache/skywalking-banyandb/pkg/logger" - pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) @@ -71,9 +70,9 @@ func (e *elementIndex) Write(docs index.Documents) error { }) } -func (e *elementIndex) Search(ctx context.Context, seriesList pbv1.SeriesList, filter index.Filter) (posting.List, error) { +func (e *elementIndex) Search(ctx context.Context, seriesList []uint64, filter index.Filter) (posting.List, error) { var result posting.List - for i, series := range seriesList { + for i, id := range seriesList { select { case <-ctx.Done(): return nil, errors.WithMessagef(ctx.Err(), "search series %d/%d", i, len(seriesList)) @@ -81,7 +80,7 @@ func (e *elementIndex) Search(ctx context.Context, seriesList pbv1.SeriesList, f } pl, err := filter.Execute(func(_ databasev1.IndexRule_Type) (index.Searcher, error) { return e.store, nil - }, series.ID) + }, common.SeriesID(id)) if err != nil { return nil, err } diff --git a/banyand/stream/merger_policy.go b/banyand/stream/merger_policy.go index ad881e58..97e62459 100644 --- a/banyand/stream/merger_policy.go +++ b/banyand/stream/merger_policy.go @@ -37,14 +37,6 @@ func newDefaultMergePolicy() *mergePolicy { return newMergePolicy(15, 1.7, run.Bytes(math.MaxInt64)) } -func newDefaultMergePolicyForTesting() *mergePolicy { - return newMergePolicy(3, 1, run.Bytes(math.MaxInt64)) -} - -func newDisabledMergePolicyForTesting() *mergePolicy { - return newMergePolicy(0, 0, 0) -} - // NewMergePolicy creates a MergePolicy with given parameters. func newMergePolicy(maxParts int, minMergeMul float64, maxFanOutSize run.Bytes) *mergePolicy { return &mergePolicy{ diff --git a/banyand/stream/query.go b/banyand/stream/query.go index e375ae16..062663c8 100644 --- a/banyand/stream/query.go +++ b/banyand/stream/query.go @@ -18,10 +18,8 @@ package stream import ( - "container/heap" "context" "fmt" - "sort" "github.com/pkg/errors" @@ -35,9 +33,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring" itersort "github.com/apache/skywalking-banyandb/pkg/iter/sort" "github.com/apache/skywalking-banyandb/pkg/logger" - "github.com/apache/skywalking-banyandb/pkg/partition" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" - "github.com/apache/skywalking-banyandb/pkg/query" logicalstream "github.com/apache/skywalking-banyandb/pkg/query/logical/stream" "github.com/apache/skywalking-banyandb/pkg/query/model" ) @@ -57,15 +53,14 @@ func (s *stream) Query(ctx context.Context, sqo model.StreamQueryOptions) (sqr m if db == nil { return nilResult, nil } - var result queryResult tsdb := db.(storage.TSDB[*tsTable, option]) - result.segments = tsdb.SelectSegments(*sqo.TimeRange) - if len(result.segments) < 1 { - return &result, nil + segments := tsdb.SelectSegments(*sqo.TimeRange) + if len(segments) < 1 { + return bypassQueryResultInstance, nil } defer func() { if err != nil { - result.Release() + sqr.Release() } }() series := make([]*pbv1.Series, len(sqo.Entities)) @@ -75,7 +70,36 @@ func (s *stream) Query(ctx context.Context, sqo model.StreamQueryOptions) (sqr m EntityValues: sqo.Entities[i], } } - var seriesList, sl pbv1.SeriesList + qo := queryOptions{ + StreamQueryOptions: sqo, + minTimestamp: sqo.TimeRange.Start.UnixNano(), + maxTimestamp: sqo.TimeRange.End.UnixNano(), + } + + if sqo.Order == nil || sqo.Order.Index == nil { + result := &tsResult{ + segments: segments, + series: series, + qo: qo, + sm: s, + } + if sqo.Order == nil { + result.asc = true + } else if sqo.Order.Sort == modelv1.Sort_SORT_ASC || sqo.Order.Sort == modelv1.Sort_SORT_UNSPECIFIED { + result.asc = true + } + return result, nil + } + var result idxResult + result.segments = segments + result.sm = s + result.qo = queryOptions{ + StreamQueryOptions: sqo, + minTimestamp: sqo.TimeRange.Start.UnixNano(), + maxTimestamp: sqo.TimeRange.End.UnixNano(), + seriesToEntity: make(map[common.SeriesID][]*modelv1.TagValue), + } + var sl pbv1.SeriesList seriesFilter := roaring.NewPostingList() for i := range result.segments { sl, err = result.segments[i].Lookup(ctx, series) @@ -86,47 +110,20 @@ func (s *stream) Query(ctx context.Context, sqo model.StreamQueryOptions) (sqr m if seriesFilter.Contains(uint64(sl[j].ID)) { continue } - seriesList = append(seriesList, sl[j]) seriesFilter.Insert(uint64(sl[j].ID)) + result.qo.seriesToEntity[sl[j].ID] = sl[j].EntityValues } result.tabs = append(result.tabs, result.segments[i].Tables()...) } - if len(seriesList) == 0 { + if seriesFilter.IsEmpty() { return &result, nil } - result.qo = queryOptions{ - StreamQueryOptions: sqo, - minTimestamp: sqo.TimeRange.Start.UnixNano(), - maxTimestamp: sqo.TimeRange.End.UnixNano(), - seriesToEntity: make(map[common.SeriesID][]*modelv1.TagValue), - } - for i := range seriesList { - result.qo.seriesToEntity[seriesList[i].ID] = seriesList[i].EntityValues - result.qo.sortedSids = append(result.qo.sortedSids, seriesList[i].ID) - } - if result.qo.elementFilter, err = indexSearch(ctx, sqo, result.tabs, seriesList); err != nil { + sids := seriesFilter.ToSlice() + if result.qo.elementFilter, err = indexSearch(ctx, sqo, result.tabs, sids); err != nil { return nil, err } - result.tagNameIndex = make(map[string]partition.TagLocator) - result.schema = s.schema - for i, tagFamilySpec := range s.schema.GetTagFamilies() { - for j, tagSpec := range tagFamilySpec.GetTags() { - result.tagNameIndex[tagSpec.GetName()] = partition.TagLocator{ - FamilyOffset: i, - TagOffset: j, - } - } - } - if sqo.Order == nil { - result.orderByTS = true - result.asc = true - return &result, nil - } - - if sqo.Order.Index == nil { - result.orderByTS = true - } else if result.sortingIter, err = s.indexSort(ctx, sqo, result.tabs, seriesList); err != nil { + if result.sortingIter, err = s.indexSort(ctx, sqo, result.tabs, sids); err != nil { return nil, err } if sqo.Order.Sort == modelv1.Sort_SORT_ASC || sqo.Order.Sort == modelv1.Sort_SORT_UNSPECIFIED { @@ -144,411 +141,17 @@ type queryOptions struct { maxTimestamp int64 } -type queryResult struct { - sortingIter itersort.Iterator[*index.DocumentResult] - tagNameIndex map[string]partition.TagLocator - schema *databasev1.Stream - tabs []*tsTable - elementIDsSorted []uint64 - data []*blockCursor - snapshots []*snapshot - segments []storage.Segment[*tsTable, option] - qo queryOptions - loaded bool - orderByTS bool - asc bool -} - -func (qr *queryResult) Pull(ctx context.Context) *model.StreamResult { - if qr.sortingIter == nil { - qo := qr.qo - sort.Slice(qo.sortedSids, func(i, j int) bool { return qo.sortedSids[i] < qo.sortedSids[j] }) - return qr.load(ctx, qo) - } - if !qr.loaded { - qr.elementIDsSorted = make([]uint64, 0, qr.qo.MaxElementSize) - return qr.loadSortingData(ctx) - } - if v := qr.nextValue(); v != nil { - return v - } - qr.loaded = false - return qr.loadSortingData(ctx) -} - -func (qr *queryResult) scanParts(ctx context.Context, qo queryOptions) error { - var parts []*part - var n int - for i := range qr.tabs { - s := qr.tabs[i].currentSnapshot() - if s == nil { - continue - } - parts, n = s.getParts(parts, qo.minTimestamp, qo.maxTimestamp) - if n < 1 { - s.decRef() - continue - } - qr.snapshots = append(qr.snapshots, s) - } - bma := generateBlockMetadataArray() - defer releaseBlockMetadataArray(bma) - defFn := startBlockScanSpan(ctx, len(qo.sortedSids), parts, qr) - defer defFn() - ti := generateTstIter() - defer releaseTstIter(ti) - sids := qo.sortedSids - ti.init(bma, parts, sids, qo.minTimestamp, qo.maxTimestamp) - if ti.Error() != nil { - return fmt.Errorf("cannot init tstIter: %w", ti.Error()) - } - var hit int - for ti.nextBlock() { - if hit%checkDoneEvery == 0 { - select { - case <-ctx.Done(): - return errors.WithMessagef(ctx.Err(), "interrupt: scanned %d blocks, remained %d/%d parts to scan", len(qr.data), len(ti.piHeap), len(ti.piPool)) - default: - } - } - hit++ - bc := generateBlockCursor() - p := ti.piHeap[0] - bc.init(p.p, p.curBlock, qo) - qr.data = append(qr.data, bc) - } - if ti.Error() != nil { - return fmt.Errorf("cannot iterate tstIter: %w", ti.Error()) - } - return nil -} - -func (qr *queryResult) load(ctx context.Context, qo queryOptions) *model.StreamResult { - if !qr.loaded { - if err := qr.scanParts(ctx, qo); err != nil { - return &model.StreamResult{ - Error: err, - } - } - if len(qr.data) == 0 { - return nil - } - - cursorChan := make(chan int, len(qr.data)) - for i := 0; i < len(qr.data); i++ { - go func(i int) { - select { - case <-ctx.Done(): - cursorChan <- i - return - default: - } - tmpBlock := generateBlock() - defer releaseBlock(tmpBlock) - if !qr.data[i].loadData(tmpBlock) { - cursorChan <- i - return - } - if qr.schema.GetEntity() == nil || len(qr.schema.GetEntity().GetTagNames()) == 0 { - cursorChan <- -1 - return - } - entityValues := qo.seriesToEntity[qr.data[i].bm.seriesID] - entityMap := make(map[string]int) - tagFamilyMap := make(map[string]int) - for idx, entity := range qr.schema.GetEntity().GetTagNames() { - entityMap[entity] = idx + 1 - } - for idx, tagFamily := range qr.data[i].tagFamilies { - tagFamilyMap[tagFamily.name] = idx + 1 - } - for _, tagFamilyProj := range qr.data[i].tagProjection { - for j, tagProj := range tagFamilyProj.Names { - offset := qr.tagNameIndex[tagProj] - tagFamilySpec := qr.schema.GetTagFamilies()[offset.FamilyOffset] - tagSpec := tagFamilySpec.GetTags()[offset.TagOffset] - if tagSpec.IndexedOnly { - continue - } - entityPos := entityMap[tagProj] - tagFamilyPos := tagFamilyMap[tagFamilyProj.Family] - if entityPos == 0 { - continue - } - if tagFamilyPos == 0 { - qr.data[i].tagFamilies[tagFamilyPos-1] = tagFamily{ - name: tagFamilyProj.Family, - tags: make([]tag, 0), - } - } - valueType := pbv1.MustTagValueToValueType(entityValues[entityPos-1]) - qr.data[i].tagFamilies[tagFamilyPos-1].tags[j] = tag{ - name: tagProj, - values: mustEncodeTagValue(tagProj, tagSpec.GetType(), entityValues[entityPos-1], len(qr.data[i].timestamps)), - valueType: valueType, - } - } - } - if qr.orderByTimestampDesc() { - qr.data[i].idx = len(qr.data[i].timestamps) - 1 - } - cursorChan <- -1 - }(i) - } - - blankCursorList := []int{} - for completed := 0; completed < len(qr.data); completed++ { - result := <-cursorChan - if result != -1 { - blankCursorList = append(blankCursorList, result) - } - } - select { - case <-ctx.Done(): - return &model.StreamResult{ - Error: errors.WithMessagef(ctx.Err(), "interrupt: blank/total=%d/%d", len(blankCursorList), len(qr.data)), - } - default: - } - sort.Slice(blankCursorList, func(i, j int) bool { - return blankCursorList[i] > blankCursorList[j] - }) - for _, index := range blankCursorList { - releaseBlockCursor(qr.data[index]) - qr.data = append(qr.data[:index], qr.data[index+1:]...) - } - qr.loaded = true - heap.Init(qr) - } - return qr.nextValue() -} - -func (qr *queryResult) nextValue() *model.StreamResult { - if len(qr.data) == 0 { - return nil - } - if !qr.orderByTS { - return qr.mergeByTagValue() - } - if len(qr.data) == 1 { - r := &model.StreamResult{} - bc := qr.data[0] - bc.copyAllTo(r, qr.orderByTimestampDesc()) - qr.releaseBlockCursor() - return r - } - return qr.mergeByTimestamp() -} - -func (qr *queryResult) loadSortingData(ctx context.Context) *model.StreamResult { - var qo queryOptions - qo.StreamQueryOptions = qr.qo.StreamQueryOptions - qo.elementFilter = roaring.NewPostingList() - qo.seriesToEntity = qr.qo.seriesToEntity - qr.elementIDsSorted = qr.elementIDsSorted[:0] - count, searchedSize := 1, 0 - tracer := query.GetTracer(ctx) - if tracer != nil { - span, _ := tracer.StartSpan(ctx, "load-sorting-data") - span.Tagf("max_element_size", "%d", qo.MaxElementSize) - if qr.qo.elementFilter != nil { - span.Tag("filter_size", fmt.Sprintf("%d", qr.qo.elementFilter.Len())) - } - defer func() { - span.Tagf("searched_size", "%d", searchedSize) - span.Tagf("count", "%d", count) - span.Stop() - }() - } - for ; qr.sortingIter.Next(); count++ { - searchedSize++ - val := qr.sortingIter.Val() - if qr.qo.elementFilter != nil && !qr.qo.elementFilter.Contains(val.DocID) { - count-- - continue - } - qo.elementFilter.Insert(val.DocID) - if val.Timestamp > qo.maxTimestamp { - qo.maxTimestamp = val.Timestamp - } - if val.Timestamp < qo.minTimestamp || qo.minTimestamp == 0 { - qo.minTimestamp = val.Timestamp - } - qr.elementIDsSorted = append(qr.elementIDsSorted, val.DocID) - - // Insertion sort - insertPos, found := -1, false - for i, sid := range qo.sortedSids { - if val.SeriesID == sid { - found = true - break - } - if val.SeriesID < sid { - insertPos = i - break - } - } - - if !found { - if insertPos == -1 { - qo.sortedSids = append(qo.sortedSids, val.SeriesID) - } else { - qo.sortedSids = append(qo.sortedSids[:insertPos], append([]common.SeriesID{val.SeriesID}, qo.sortedSids[insertPos:]...)...) - } - } - if count >= qo.MaxElementSize { - break - } - } - if qo.elementFilter.IsEmpty() { - return nil - } - return qr.load(ctx, qo) -} - -func (qr *queryResult) releaseParts() { - qr.releaseBlockCursor() - for i := range qr.snapshots { - qr.snapshots[i].decRef() - } - qr.snapshots = qr.snapshots[:0] -} - -func (qr *queryResult) releaseBlockCursor() { - for i, v := range qr.data { - releaseBlockCursor(v) - qr.data[i] = nil - } - qr.data = qr.data[:0] -} - -func (qr *queryResult) Release() { - qr.releaseParts() - for i := range qr.segments { - qr.segments[i].DecRef() - } -} - -func (qr queryResult) Len() int { - return len(qr.data) -} - -func (qr queryResult) Less(i, j int) bool { - leftIdx, rightIdx := qr.data[i].idx, qr.data[j].idx - leftTS := qr.data[i].timestamps[leftIdx] - rightTS := qr.data[j].timestamps[rightIdx] - if qr.asc { - return leftTS < rightTS - } - return leftTS > rightTS -} - -func (qr queryResult) Swap(i, j int) { - qr.data[i], qr.data[j] = qr.data[j], qr.data[i] -} - -func (qr *queryResult) Push(x interface{}) { - qr.data = append(qr.data, x.(*blockCursor)) -} - -func (qr *queryResult) Pop() interface{} { - old := qr.data - n := len(old) - x := old[n-1] - qr.data = old[0 : n-1] - releaseBlockCursor(x) - return x -} - -func (qr *queryResult) orderByTimestampDesc() bool { - return qr.orderByTS && !qr.asc -} - -func (qr *queryResult) mergeByTagValue() *model.StreamResult { - defer qr.releaseBlockCursor() - tmp := &model.StreamResult{} - prevIdx := 0 - elementIDToIdx := make(map[uint64]int) - for _, data := range qr.data { - data.copyAllTo(tmp, false) - var idx int - for idx = prevIdx; idx < len(tmp.Timestamps); idx++ { - elementIDToIdx[tmp.ElementIDs[idx]] = idx - } - prevIdx = idx - } - - r := &model.StreamResult{ - TagFamilies: []model.TagFamily{}, - } - for _, tagFamily := range tmp.TagFamilies { - tf := model.TagFamily{ - Name: tagFamily.Name, - Tags: []model.Tag{}, - } - for _, tag := range tagFamily.Tags { - t := model.Tag{ - Name: tag.Name, - Values: []*modelv1.TagValue{}, - } - tf.Tags = append(tf.Tags, t) - } - r.TagFamilies = append(r.TagFamilies, tf) - } - for _, id := range qr.elementIDsSorted { - idx, ok := elementIDToIdx[id] - if !ok { - continue - } - r.Timestamps = append(r.Timestamps, tmp.Timestamps[idx]) - r.ElementIDs = append(r.ElementIDs, tmp.ElementIDs[idx]) - for i := 0; i < len(r.TagFamilies); i++ { - for j := 0; j < len(r.TagFamilies[i].Tags); j++ { - r.TagFamilies[i].Tags[j].Values = append(r.TagFamilies[i].Tags[j].Values, tmp.TagFamilies[i].Tags[j].Values[idx]) - } - } - } - return r -} - -func (qr *queryResult) mergeByTimestamp() *model.StreamResult { - step := 1 - if qr.orderByTimestampDesc() { - step = -1 - } - result := &model.StreamResult{} - var lastSid common.SeriesID - - for qr.Len() > 0 { - topBC := qr.data[0] - if lastSid != 0 && topBC.bm.seriesID != lastSid { - return result - } - lastSid = topBC.bm.seriesID - - topBC.copyTo(result) - topBC.idx += step - - if qr.orderByTimestampDesc() { - if topBC.idx < 0 { - heap.Pop(qr) - } else { - heap.Fix(qr, 0) - } - } else { - if topBC.idx >= len(topBC.timestamps) { - heap.Pop(qr) - } else { - heap.Fix(qr, 0) - } - } - } - - return result +func (qo *queryOptions) copyFrom(other *queryOptions) { + qo.StreamQueryOptions = other.StreamQueryOptions + qo.elementFilter = other.elementFilter + qo.seriesToEntity = other.seriesToEntity + qo.sortedSids = other.sortedSids + qo.minTimestamp = other.minTimestamp + qo.maxTimestamp = other.maxTimestamp } func indexSearch(ctx context.Context, sqo model.StreamQueryOptions, - tabs []*tsTable, seriesList pbv1.SeriesList, + tabs []*tsTable, seriesList []uint64, ) (posting.List, error) { if sqo.Filter == nil || sqo.Filter == logicalstream.ENode { return nil, nil @@ -571,11 +174,15 @@ func indexSearch(ctx context.Context, sqo model.StreamQueryOptions, } func (s *stream) indexSort(ctx context.Context, sqo model.StreamQueryOptions, tabs []*tsTable, - seriesList pbv1.SeriesList, + sids []uint64, ) (itersort.Iterator[*index.DocumentResult], error) { if sqo.Order == nil || sqo.Order.Index == nil { return nil, nil } + seriesList := make([]common.SeriesID, len(sids)) + for i := range sids { + seriesList[i] = common.SeriesID(sids[i]) + } iters, err := s.buildItersByIndex(ctx, tabs, seriesList, sqo) if err != nil { return nil, err @@ -585,13 +192,12 @@ func (s *stream) indexSort(ctx context.Context, sqo model.StreamQueryOptions, ta } func (s *stream) buildItersByIndex(ctx context.Context, tables []*tsTable, - seriesList pbv1.SeriesList, sqo model.StreamQueryOptions, + sids []common.SeriesID, sqo model.StreamQueryOptions, ) (iters []itersort.Iterator[*index.DocumentResult], err error) { indexRuleForSorting := sqo.Order.Index if len(indexRuleForSorting.Tags) != 1 { return nil, fmt.Errorf("only support one tag for sorting, but got %d", len(indexRuleForSorting.Tags)) } - sids := seriesList.IDs() for _, tw := range tables { var iter index.FieldIterator[*index.DocumentResult] fieldKey := index.FieldKey{ diff --git a/banyand/stream/query_by_idx.go b/banyand/stream/query_by_idx.go new file mode 100644 index 00000000..106e556c --- /dev/null +++ b/banyand/stream/query_by_idx.go @@ -0,0 +1,321 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package stream + +import ( + "context" + "fmt" + "sort" + + "github.com/pkg/errors" + + "github.com/apache/skywalking-banyandb/api/common" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/banyand/internal/storage" + "github.com/apache/skywalking-banyandb/pkg/index" + "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring" + itersort "github.com/apache/skywalking-banyandb/pkg/iter/sort" + "github.com/apache/skywalking-banyandb/pkg/query" + "github.com/apache/skywalking-banyandb/pkg/query/model" +) + +type idxResult struct { + sortingIter itersort.Iterator[*index.DocumentResult] + sm *stream + tabs []*tsTable + elementIDsSorted []uint64 + data []*blockCursor + snapshots []*snapshot + segments []storage.Segment[*tsTable, option] + qo queryOptions + loaded bool + asc bool +} + +func (qr *idxResult) Pull(ctx context.Context) *model.StreamResult { + if !qr.loaded { + qr.elementIDsSorted = make([]uint64, 0, qr.qo.MaxElementSize) + return qr.loadSortingData(ctx) + } + if v := qr.nextValue(); v != nil { + return v + } + qr.loaded = false + return qr.loadSortingData(ctx) +} + +func (qr *idxResult) scanParts(ctx context.Context, qo queryOptions) error { + var parts []*part + var n int + for i := range qr.tabs { + s := qr.tabs[i].currentSnapshot() + if s == nil { + continue + } + parts, n = s.getParts(parts, qo.minTimestamp, qo.maxTimestamp) + if n < 1 { + s.decRef() + continue + } + qr.snapshots = append(qr.snapshots, s) + } + bma := generateBlockMetadataArray() + defer releaseBlockMetadataArray(bma) + defFn := startBlockScanSpan(ctx, len(qo.sortedSids), parts, qr) + defer defFn() + ti := generateTstIter() + defer releaseTstIter(ti) + sids := qo.sortedSids + ti.init(bma, parts, sids, qo.minTimestamp, qo.maxTimestamp) + if ti.Error() != nil { + return fmt.Errorf("cannot init tstIter: %w", ti.Error()) + } + var hit int + for ti.nextBlock() { + if hit%checkDoneEvery == 0 { + select { + case <-ctx.Done(): + return errors.WithMessagef(ctx.Err(), "interrupt: scanned %d blocks, remained %d/%d parts to scan", len(qr.data), len(ti.piHeap), len(ti.piPool)) + default: + } + } + hit++ + bc := generateBlockCursor() + p := ti.piHeap[0] + bc.init(p.p, p.curBlock, qo) + qr.data = append(qr.data, bc) + } + if ti.Error() != nil { + return fmt.Errorf("cannot iterate tstIter: %w", ti.Error()) + } + return nil +} + +func (qr *idxResult) load(ctx context.Context, qo queryOptions) *model.StreamResult { + if qr.loaded { + qr.nextValue() + } + if err := qr.scanParts(ctx, qo); err != nil { + return &model.StreamResult{ + Error: err, + } + } + if len(qr.data) == 0 { + return nil + } + + cursorChan := make(chan int, len(qr.data)) + for i := 0; i < len(qr.data); i++ { + go func(i int) { + select { + case <-ctx.Done(): + cursorChan <- i + return + default: + } + if qr.sm.schema.GetEntity() == nil || len(qr.sm.schema.GetEntity().GetTagNames()) == 0 { + cursorChan <- -1 + return + } + tmpBlock := generateBlock() + defer releaseBlock(tmpBlock) + if loadBlockCursor(qr.data[i], tmpBlock, qo, qr.sm) { + cursorChan <- -1 + return + } + cursorChan <- i + }(i) + } + + blankCursorList := []int{} + for completed := 0; completed < len(qr.data); completed++ { + result := <-cursorChan + if result != -1 { + blankCursorList = append(blankCursorList, result) + } + } + select { + case <-ctx.Done(): + return &model.StreamResult{ + Error: errors.WithMessagef(ctx.Err(), "interrupt: blank/total=%d/%d", len(blankCursorList), len(qr.data)), + } + default: + } + sort.Slice(blankCursorList, func(i, j int) bool { + return blankCursorList[i] > blankCursorList[j] + }) + for _, index := range blankCursorList { + releaseBlockCursor(qr.data[index]) + qr.data = append(qr.data[:index], qr.data[index+1:]...) + } + qr.loaded = true + return qr.nextValue() +} + +func (qr *idxResult) nextValue() *model.StreamResult { + if len(qr.data) == 0 { + return nil + } + return qr.mergeByTagValue() +} + +func (qr *idxResult) loadSortingData(ctx context.Context) *model.StreamResult { + var qo queryOptions + qo.StreamQueryOptions = qr.qo.StreamQueryOptions + qo.elementFilter = roaring.NewPostingList() + qo.seriesToEntity = qr.qo.seriesToEntity + qr.elementIDsSorted = qr.elementIDsSorted[:0] + count, searchedSize := 1, 0 + tracer := query.GetTracer(ctx) + if tracer != nil { + span, _ := tracer.StartSpan(ctx, "load-sorting-data") + span.Tagf("max_element_size", "%d", qo.MaxElementSize) + if qr.qo.elementFilter != nil { + span.Tag("filter_size", fmt.Sprintf("%d", qr.qo.elementFilter.Len())) + } + defer func() { + span.Tagf("searched_size", "%d", searchedSize) + span.Tagf("count", "%d", count) + span.Stop() + }() + } + for ; qr.sortingIter.Next(); count++ { + searchedSize++ + val := qr.sortingIter.Val() + if qr.qo.elementFilter != nil && !qr.qo.elementFilter.Contains(val.DocID) { + count-- + continue + } + qo.elementFilter.Insert(val.DocID) + if val.Timestamp > qo.maxTimestamp { + qo.maxTimestamp = val.Timestamp + } + if val.Timestamp < qo.minTimestamp || qo.minTimestamp == 0 { + qo.minTimestamp = val.Timestamp + } + qr.elementIDsSorted = append(qr.elementIDsSorted, val.DocID) + + // Insertion sort + insertPos, found := -1, false + for i, sid := range qo.sortedSids { + if val.SeriesID == sid { + found = true + break + } + if val.SeriesID < sid { + insertPos = i + break + } + } + + if !found { + if insertPos == -1 { + qo.sortedSids = append(qo.sortedSids, val.SeriesID) + } else { + qo.sortedSids = append(qo.sortedSids[:insertPos], append([]common.SeriesID{val.SeriesID}, qo.sortedSids[insertPos:]...)...) + } + } + if count >= qo.MaxElementSize { + break + } + } + if qo.elementFilter.IsEmpty() { + return nil + } + return qr.load(ctx, qo) +} + +func (qr *idxResult) releaseParts() { + qr.releaseBlockCursor() + for i := range qr.snapshots { + qr.snapshots[i].decRef() + } + qr.snapshots = qr.snapshots[:0] +} + +func (qr *idxResult) releaseBlockCursor() { + for i, v := range qr.data { + releaseBlockCursor(v) + qr.data[i] = nil + } + qr.data = qr.data[:0] +} + +func (qr *idxResult) Release() { + qr.releaseParts() + for i := range qr.segments { + qr.segments[i].DecRef() + } +} + +func (qr *idxResult) mergeByTagValue() *model.StreamResult { + defer qr.releaseBlockCursor() + tmp := &model.StreamResult{} + prevIdx := 0 + elementIDToIdx := make(map[uint64]int) + for _, data := range qr.data { + data.copyAllTo(tmp, false) + var idx int + for idx = prevIdx; idx < len(tmp.Timestamps); idx++ { + elementIDToIdx[tmp.ElementIDs[idx]] = idx + } + prevIdx = idx + } + + r := &model.StreamResult{ + TagFamilies: []model.TagFamily{}, + } + for _, tagFamily := range tmp.TagFamilies { + tf := model.TagFamily{ + Name: tagFamily.Name, + Tags: []model.Tag{}, + } + for _, tag := range tagFamily.Tags { + t := model.Tag{ + Name: tag.Name, + Values: []*modelv1.TagValue{}, + } + tf.Tags = append(tf.Tags, t) + } + r.TagFamilies = append(r.TagFamilies, tf) + } + for _, id := range qr.elementIDsSorted { + idx, ok := elementIDToIdx[id] + if !ok { + continue + } + r.Timestamps = append(r.Timestamps, tmp.Timestamps[idx]) + r.ElementIDs = append(r.ElementIDs, tmp.ElementIDs[idx]) + for i := 0; i < len(r.TagFamilies); i++ { + for j := 0; j < len(r.TagFamilies[i].Tags); j++ { + r.TagFamilies[i].Tags[j].Values = append(r.TagFamilies[i].Tags[j].Values, tmp.TagFamilies[i].Tags[j].Values[idx]) + } + } + } + return r +} + +var bypassQueryResultInstance = &bypassQueryResult{} + +type bypassQueryResult struct{} + +func (bypassQueryResult) Pull(context.Context) *model.StreamResult { + return nil +} + +func (bypassQueryResult) Release() {} diff --git a/banyand/stream/query_by_ts.go b/banyand/stream/query_by_ts.go new file mode 100644 index 00000000..0c9ccb11 --- /dev/null +++ b/banyand/stream/query_by_ts.go @@ -0,0 +1,265 @@ +package stream + +import ( + "container/heap" + "context" + "sync" + + "go.uber.org/multierr" + + "github.com/apache/skywalking-banyandb/banyand/internal/storage" + "github.com/apache/skywalking-banyandb/pkg/cgroups" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/pool" + "github.com/apache/skywalking-banyandb/pkg/query/model" +) + +var _ model.StreamQueryResult = (*tsResult)(nil) + +type tsResult struct { + sm *stream + segments []storage.Segment[*tsTable, option] + series []*pbv1.Series + shards []*model.StreamResult + qo queryOptions + asc bool +} + +func (t *tsResult) Pull(ctx context.Context) *model.StreamResult { + if len(t.segments) == 0 { + return &model.StreamResult{} + } + if err := t.scanSegment(ctx); err != nil { + return &model.StreamResult{Error: err} + } + var err error + for i := range t.shards { + if t.shards[i].Error != nil { + err = multierr.Append(err, t.shards[i].Error) + } + } + if err != nil { + return &model.StreamResult{Error: err} + } + return model.MergeStreamResults(t.shards, t.qo.MaxElementSize, t.asc) +} + +func (t *tsResult) scanSegment(ctx context.Context) error { + var segment storage.Segment[*tsTable, option] + if t.asc { + segment = t.segments[len(t.segments)-1] + t.segments = t.segments[:len(t.segments)-1] + } else { + segment = t.segments[0] + t.segments = t.segments[1:] + } + + bs := blockScanner{ + segment: segment, + qo: t.qo, + series: t.series, + } + defer bs.close() + if err := bs.searchSeries(ctx); err != nil { + return err + } + workerSize := cgroups.CPUs() + var workerWg sync.WaitGroup + batchCh := make(chan *blockScanResultBatch, workerSize) + workerWg.Add(workerSize) + if t.shards == nil { + t.shards = make([]*model.StreamResult, workerSize) + for i := range t.shards { + t.shards[i] = model.NewStreamResult(t.qo.MaxElementSize, t.asc) + } + } else { + for i := range t.shards { + t.shards[i].Reset() + } + } + for i := 0; i < workerSize; i++ { + go func(workerID int) { + tmpBlock := generateBlock() + defer releaseBlock(tmpBlock) + blockHeap := generateBlockCursorHeap(t.asc) + defer releaseBlockCursorHeap(blockHeap) + tmpResult := model.NewStreamResult(t.qo.MaxElementSize, t.asc) + for batch := range batchCh { + if batch.err != nil { + t.shards[workerID].Error = batch.err + releaseBlockScanResultBatch(batch) + continue + } + for _, bs := range batch.bss { + bc := generateBlockCursor() + bc.init(bs.p, &bs.bm, bs.qo) + if loadBlockCursor(bc, tmpBlock, bs.qo, t.sm) { + if !t.asc { + bc.idx = len(bc.timestamps) - 1 + } + blockHeap.Push(bc) + } + } + releaseBlockScanResultBatch(batch) + heap.Init(blockHeap) + result := blockHeap.merge(t.qo.MaxElementSize) + t.shards[workerID].CopyFrom(tmpResult, result) + blockHeap.reset() + } + workerWg.Done() + }(i) + } + + var scannerWg sync.WaitGroup + finalizers := bs.scanShardsInParallel(ctx, &scannerWg, batchCh) + scannerWg.Wait() + close(batchCh) + workerWg.Wait() + for i := range finalizers { + if finalizers[i] != nil { + finalizers[i]() + } + } + return nil +} + +func loadBlockCursor(bc *blockCursor, tmpBlock *block, qo queryOptions, sm *stream) bool { + tmpBlock.reset() + if !bc.loadData(tmpBlock) { + releaseBlockCursor(bc) + return false + } + entityValues := qo.seriesToEntity[bc.bm.seriesID] + + tagFamilyMap := make(map[string]int) + for idx, tagFamily := range bc.tagFamilies { + tagFamilyMap[tagFamily.name] = idx + 1 + } + for _, tagFamilyProj := range bc.tagProjection { + for j, tagProj := range tagFamilyProj.Names { + tagSpec := sm.tagMap[tagProj] + if tagSpec.IndexedOnly { + continue + } + entityPos := sm.entityMap[tagProj] + if entityPos == 0 { + continue + } + tagFamilyPos := tagFamilyMap[tagFamilyProj.Family] + if tagFamilyPos == 0 { + bc.tagFamilies[tagFamilyPos-1] = tagFamily{ + name: tagFamilyProj.Family, + tags: make([]tag, 0), + } + } + valueType := pbv1.MustTagValueToValueType(entityValues[entityPos-1]) + bc.tagFamilies[tagFamilyPos-1].tags[j] = tag{ + name: tagProj, + values: mustEncodeTagValue(tagProj, tagSpec.GetType(), entityValues[entityPos-1], len(bc.timestamps)), + valueType: valueType, + } + } + } + return true +} + +func (t *tsResult) Release() { + for i := range t.segments { + t.segments[i].DecRef() + } +} + +type blockCursorHeap struct { + bcc []*blockCursor + asc bool +} + +func (bch blockCursorHeap) Len() int { + return len(bch.bcc) +} + +func (bch blockCursorHeap) Less(i, j int) bool { + leftIdx, rightIdx := bch.bcc[i].idx, bch.bcc[j].idx + leftTS := bch.bcc[i].timestamps[leftIdx] + rightTS := bch.bcc[j].timestamps[rightIdx] + if bch.asc { + return leftTS < rightTS + } + return leftTS > rightTS +} + +func (bch *blockCursorHeap) Swap(i, j int) { + bch.bcc[i], bch.bcc[j] = bch.bcc[j], bch.bcc[i] +} + +func (bch *blockCursorHeap) Push(x interface{}) { + bch.bcc = append(bch.bcc, x.(*blockCursor)) +} + +func (bch *blockCursorHeap) Pop() interface{} { + old := bch.bcc + n := len(old) + x := old[n-1] + bch.bcc = old[0 : n-1] + releaseBlockCursor(x) + return x +} + +func (bch *blockCursorHeap) reset() { + for i := range bch.bcc { + releaseBlockCursor(bch.bcc[i]) + } + bch.bcc = bch.bcc[:0] +} + +func (bch *blockCursorHeap) merge(limit int) *model.StreamResult { + step := -1 + if bch.asc { + step = 1 + } + result := &model.StreamResult{} + + for bch.Len() > 0 { + topBC := bch.bcc[0] + topBC.copyTo(result) + if result.Len() >= limit { + break + } + topBC.idx += step + + if bch.asc { + if topBC.idx >= len(topBC.timestamps) { + heap.Pop(bch) + } else { + heap.Fix(bch, 0) + } + } else { + if topBC.idx < 0 { + heap.Pop(bch) + } else { + heap.Fix(bch, 0) + } + } + } + + return result +} + +var blockCursorHeapPool = pool.Register[*blockCursorHeap]("stream-blockCursorHeap") + +func generateBlockCursorHeap(asc bool) *blockCursorHeap { + v := blockCursorHeapPool.Get() + if v == nil { + return &blockCursorHeap{ + asc: asc, + bcc: make([]*blockCursor, 0, blockScannerBatchSize), + } + } + v.asc = asc + return v +} + +func releaseBlockCursorHeap(bch *blockCursorHeap) { + bch.reset() + blockCursorHeapPool.Put(bch) +} diff --git a/banyand/stream/query_test.go b/banyand/stream/query_test.go deleted file mode 100644 index 9eea344d..00000000 --- a/banyand/stream/query_test.go +++ /dev/null @@ -1,424 +0,0 @@ -// Licensed to Apache Software Foundation (ASF) under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Apache Software Foundation (ASF) licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package stream - -import ( - "context" - "errors" - "sort" - "testing" - "time" - - "github.com/google/go-cmp/cmp" - "github.com/stretchr/testify/require" - "google.golang.org/protobuf/testing/protocmp" - - "github.com/apache/skywalking-banyandb/api/common" - modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" - "github.com/apache/skywalking-banyandb/pkg/fs" - "github.com/apache/skywalking-banyandb/pkg/logger" - pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" - "github.com/apache/skywalking-banyandb/pkg/query/model" - "github.com/apache/skywalking-banyandb/pkg/run" - "github.com/apache/skywalking-banyandb/pkg/test" - "github.com/apache/skywalking-banyandb/pkg/timestamp" - "github.com/apache/skywalking-banyandb/pkg/watcher" -) - -func TestQueryResult(t *testing.T) { - tests := []struct { - wantErr error - name string - esList []*elements - sids []common.SeriesID - want []model.StreamResult - minTimestamp int64 - maxTimestamp int64 - orderBySeries bool - ascTS bool - }{ - { - name: "Test with multiple parts with duplicated data order by TS", - esList: []*elements{esTS1, esTS1}, - sids: []common.SeriesID{1, 2, 3}, - minTimestamp: 1, - maxTimestamp: 1, - want: []model.StreamResult{{ - SIDs: []common.SeriesID{1}, - Timestamps: []int64{1}, - ElementIDs: []uint64{11}, - TagFamilies: []model.TagFamily{ - {Name: "arrTag", Tags: []model.Tag{ - {Name: "strArrTag", Values: []*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"})}}, - {Name: "intArrTag", Values: []*modelv1.TagValue{int64ArrTagValue([]int64{25, 30})}}, - }}, - {Name: "binaryTag", Tags: []model.Tag{ - {Name: "binaryTag", Values: []*modelv1.TagValue{binaryDataTagValue(longText)}}, - }}, - {Name: "singleTag", Tags: []model.Tag{ - {Name: "strTag", Values: []*modelv1.TagValue{strTagValue("value1")}}, - {Name: "intTag", Values: []*modelv1.TagValue{int64TagValue(10)}}, - {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, - {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, - }}, - }, - }, { - SIDs: []common.SeriesID{3, 3}, - Timestamps: []int64{1, 1}, - ElementIDs: []uint64{31, 31}, - TagFamilies: emptyTagFamilies(2), - }, { - SIDs: []common.SeriesID{2, 2}, - Timestamps: []int64{1, 1}, - ElementIDs: []uint64{21, 21}, - TagFamilies: []model.TagFamily{ - {Name: "arrTag", Tags: []model.Tag{ - {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}}, - {Name: "intArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}}, - }}, - {Name: "binaryTag", Tags: []model.Tag{ - {Name: "binaryTag", Values: []*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}}, - }}, - {Name: "singleTag", Tags: []model.Tag{ - {Name: "strTag", Values: []*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}}, - {Name: "intTag", Values: []*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}}, - {Name: "strTag1", Values: []*modelv1.TagValue{strTagValue("tag1"), strTagValue("tag1")}}, - {Name: "strTag2", Values: []*modelv1.TagValue{strTagValue("tag2"), strTagValue("tag2")}}, - }}, - }, - }, { - SIDs: []common.SeriesID{1}, - Timestamps: []int64{1}, - ElementIDs: []uint64{11}, - TagFamilies: []model.TagFamily{ - {Name: "arrTag", Tags: []model.Tag{ - {Name: "strArrTag", Values: []*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"})}}, - {Name: "intArrTag", Values: []*modelv1.TagValue{int64ArrTagValue([]int64{25, 30})}}, - }}, - {Name: "binaryTag", Tags: []model.Tag{ - {Name: "binaryTag", Values: []*modelv1.TagValue{binaryDataTagValue(longText)}}, - }}, - {Name: "singleTag", Tags: []model.Tag{ - {Name: "strTag", Values: []*modelv1.TagValue{strTagValue("value1")}}, - {Name: "intTag", Values: []*modelv1.TagValue{int64TagValue(10)}}, - {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, - {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, - }}, - }, - }}, - }, - { - name: "Test with multiple parts with multiple data orderBy TS desc", - esList: []*elements{esTS1, esTS2}, - sids: []common.SeriesID{1, 2, 3}, - minTimestamp: 1, - maxTimestamp: 2, - want: []model.StreamResult{{ - SIDs: []common.SeriesID{1}, - Timestamps: []int64{2}, - ElementIDs: []uint64{12}, - TagFamilies: []model.TagFamily{ - {Name: "arrTag", Tags: []model.Tag{ - {Name: "strArrTag", Values: []*modelv1.TagValue{strArrTagValue([]string{"value5", "value6"})}}, - {Name: "intArrTag", Values: []*modelv1.TagValue{int64ArrTagValue([]int64{35, 40})}}, - }}, - {Name: "binaryTag", Tags: []model.Tag{ - {Name: "binaryTag", Values: []*modelv1.TagValue{binaryDataTagValue(longText)}}, - }}, - {Name: "singleTag", Tags: []model.Tag{ - {Name: "strTag", Values: []*modelv1.TagValue{strTagValue("value3")}}, - {Name: "intTag", Values: []*modelv1.TagValue{int64TagValue(30)}}, - {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, - {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, - }}, - }, - }, { - SIDs: []common.SeriesID{2}, - Timestamps: []int64{2}, - ElementIDs: []uint64{22}, - TagFamilies: []model.TagFamily{ - {Name: "arrTag", Tags: []model.Tag{ - {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, - {Name: "intArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, - }}, - {Name: "binaryTag", Tags: []model.Tag{ - {Name: "binaryTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, - }}, - {Name: "singleTag", Tags: []model.Tag{ - {Name: "strTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, - {Name: "intTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, - {Name: "strTag1", Values: []*modelv1.TagValue{strTagValue("tag3")}}, - {Name: "strTag2", Values: []*modelv1.TagValue{strTagValue("tag4")}}, - }}, - }, - }, { - SIDs: []common.SeriesID{3}, - Timestamps: []int64{2}, - ElementIDs: []uint64{32}, - TagFamilies: emptyTagFamilies(1), - }, { - SIDs: []common.SeriesID{1}, - Timestamps: []int64{1}, - ElementIDs: []uint64{11}, - TagFamilies: []model.TagFamily{ - {Name: "arrTag", Tags: []model.Tag{ - {Name: "strArrTag", Values: []*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"})}}, - {Name: "intArrTag", Values: []*modelv1.TagValue{int64ArrTagValue([]int64{25, 30})}}, - }}, - {Name: "binaryTag", Tags: []model.Tag{ - {Name: "binaryTag", Values: []*modelv1.TagValue{binaryDataTagValue(longText)}}, - }}, - {Name: "singleTag", Tags: []model.Tag{ - {Name: "strTag", Values: []*modelv1.TagValue{strTagValue("value1")}}, - {Name: "intTag", Values: []*modelv1.TagValue{int64TagValue(10)}}, - {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, - {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, - }}, - }, - }, { - SIDs: []common.SeriesID{3}, - Timestamps: []int64{1}, - ElementIDs: []uint64{31}, - TagFamilies: emptyTagFamilies(1), - }, { - SIDs: []common.SeriesID{2}, - Timestamps: []int64{1}, - ElementIDs: []uint64{21}, - TagFamilies: []model.TagFamily{ - {Name: "arrTag", Tags: []model.Tag{ - {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, - {Name: "intArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, - }}, - {Name: "binaryTag", Tags: []model.Tag{ - {Name: "binaryTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, - }}, - {Name: "singleTag", Tags: []model.Tag{ - {Name: "strTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, - {Name: "intTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, - {Name: "strTag1", Values: []*modelv1.TagValue{strTagValue("tag1")}}, - {Name: "strTag2", Values: []*modelv1.TagValue{strTagValue("tag2")}}, - }}, - }, - }}, - }, - { - name: "Test with multiple parts with multiple data orderBy TS asc", - esList: []*elements{esTS1, esTS2}, - sids: []common.SeriesID{1, 2, 3}, - ascTS: true, - minTimestamp: 1, - maxTimestamp: 2, - want: []model.StreamResult{{ - SIDs: []common.SeriesID{1}, - Timestamps: []int64{1}, - ElementIDs: []uint64{11}, - TagFamilies: []model.TagFamily{ - {Name: "arrTag", Tags: []model.Tag{ - {Name: "strArrTag", Values: []*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"})}}, - {Name: "intArrTag", Values: []*modelv1.TagValue{int64ArrTagValue([]int64{25, 30})}}, - }}, - {Name: "binaryTag", Tags: []model.Tag{ - {Name: "binaryTag", Values: []*modelv1.TagValue{binaryDataTagValue(longText)}}, - }}, - {Name: "singleTag", Tags: []model.Tag{ - {Name: "strTag", Values: []*modelv1.TagValue{strTagValue("value1")}}, - {Name: "intTag", Values: []*modelv1.TagValue{int64TagValue(10)}}, - {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, - {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, - }}, - }, - }, { - SIDs: []common.SeriesID{3}, - Timestamps: []int64{1}, - ElementIDs: []uint64{31}, - TagFamilies: emptyTagFamilies(1), - }, { - SIDs: []common.SeriesID{2, 2}, - Timestamps: []int64{1, 2}, - ElementIDs: []uint64{21, 22}, - TagFamilies: []model.TagFamily{ - {Name: "arrTag", Tags: []model.Tag{ - {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}}, - {Name: "intArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}}, - }}, - {Name: "binaryTag", Tags: []model.Tag{ - {Name: "binaryTag", Values: []*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}}, - }}, - {Name: "singleTag", Tags: []model.Tag{ - {Name: "strTag", Values: []*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}}, - {Name: "intTag", Values: []*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}}, - {Name: "strTag1", Values: []*modelv1.TagValue{strTagValue("tag1"), strTagValue("tag3")}}, - {Name: "strTag2", Values: []*modelv1.TagValue{strTagValue("tag2"), strTagValue("tag4")}}, - }}, - }, - }, { - SIDs: []common.SeriesID{1}, - Timestamps: []int64{2}, - ElementIDs: []uint64{12}, - TagFamilies: []model.TagFamily{ - {Name: "arrTag", Tags: []model.Tag{ - {Name: "strArrTag", Values: []*modelv1.TagValue{strArrTagValue([]string{"value5", "value6"})}}, - {Name: "intArrTag", Values: []*modelv1.TagValue{int64ArrTagValue([]int64{35, 40})}}, - }}, - {Name: "binaryTag", Tags: []model.Tag{ - {Name: "binaryTag", Values: []*modelv1.TagValue{binaryDataTagValue(longText)}}, - }}, - {Name: "singleTag", Tags: []model.Tag{ - {Name: "strTag", Values: []*modelv1.TagValue{strTagValue("value3")}}, - {Name: "intTag", Values: []*modelv1.TagValue{int64TagValue(30)}}, - {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, - {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, - }}, - }, - }, { - SIDs: []common.SeriesID{3}, - Timestamps: []int64{2}, - ElementIDs: []uint64{32}, - TagFamilies: emptyTagFamilies(1), - }}, - }, - } - bma := generateBlockMetadataArray() - defer releaseBlockMetadataArray(bma) - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - verify := func(t *testing.T, tst *tsTable) { - var result queryResult - - result.qo = queryOptions{ - minTimestamp: tt.minTimestamp, - maxTimestamp: tt.maxTimestamp, - sortedSids: tt.sids, - StreamQueryOptions: model.StreamQueryOptions{ - TagProjection: tagProjectionAll, - }, - } - result.tabs = []*tsTable{tst} - defer result.Release() - if !tt.orderBySeries { - result.orderByTS = true - result.asc = tt.ascTS - } - var got []model.StreamResult - ctx := context.Background() - for { - r := result.Pull(ctx) - if r == nil { - break - } - if !errors.Is(r.Error, tt.wantErr) { - t.Errorf("Unexpected error: got %v, want %v", r.Error, tt.wantErr) - } - sort.Slice(r.TagFamilies, func(i, j int) bool { - return r.TagFamilies[i].Name < r.TagFamilies[j].Name - }) - got = append(got, *r) - } - - if diff := cmp.Diff(got, tt.want, - protocmp.IgnoreUnknown(), protocmp.Transform()); diff != "" { - t.Errorf("Unexpected []pbv1.Result (-got +want):\n%s", diff) - } - } - - t.Run("memory snapshot", func(t *testing.T) { - tmpPath, defFn := test.Space(require.New(t)) - defer defFn() - index, _ := newElementIndex(context.TODO(), tmpPath, 0, nil) - tst := &tsTable{ - index: index, - loopCloser: run.NewCloser(2), - introductions: make(chan *introduction), - fileSystem: fs.NewLocalFileSystem(), - root: tmpPath, - } - tst.gc.init(tst) - flushCh := make(chan *flusherIntroduction) - mergeCh := make(chan *mergerIntroduction) - introducerWatcher := make(watcher.Channel, 1) - go tst.introducerLoop(flushCh, mergeCh, introducerWatcher, 1) - for _, es := range tt.esList { - tst.mustAddElements(es) - time.Sleep(100 * time.Millisecond) - } - verify(t, tst) - }) - - t.Run("file snapshot", func(t *testing.T) { - // Initialize a tstIter object. - tmpPath, defFn := test.Space(require.New(t)) - fileSystem := fs.NewLocalFileSystem() - defer defFn() - tst, err := newTSTable(fileSystem, tmpPath, common.Position{}, - // Since Stream deduplicate data in merging process, we need to disable the merging in the test. - logger.GetLogger("test"), timestamp.TimeRange{}, option{flushTimeout: 0, mergePolicy: newDisabledMergePolicyForTesting()}, nil) - require.NoError(t, err) - for _, es := range tt.esList { - tst.mustAddElements(es) - time.Sleep(100 * time.Millisecond) - } - // wait until the introducer is done - if len(tt.esList) > 0 { - for { - snp := tst.currentSnapshot() - if snp == nil { - time.Sleep(100 * time.Millisecond) - continue - } - if snp.creator != snapshotCreatorMemPart && len(snp.parts) == len(tt.esList) { - snp.decRef() - tst.Close() - break - } - snp.decRef() - time.Sleep(100 * time.Millisecond) - } - } - - // reopen the table - tst, err = newTSTable(fileSystem, tmpPath, common.Position{}, - logger.GetLogger("test"), timestamp.TimeRange{}, option{flushTimeout: defaultFlushTimeout, mergePolicy: newDefaultMergePolicyForTesting()}, nil) - require.NoError(t, err) - - verify(t, tst) - }) - }) - } -} - -func emptyTagFamilies(size int) []model.TagFamily { - var values []*modelv1.TagValue - for i := 0; i < size; i++ { - values = append(values, pbv1.NullTagValue) - } - return []model.TagFamily{ - {Name: "arrTag", Tags: []model.Tag{ - {Name: "strArrTag", Values: values}, - {Name: "intArrTag", Values: values}, - }}, - {Name: "binaryTag", Tags: []model.Tag{ - {Name: "binaryTag", Values: values}, - }}, - {Name: "singleTag", Tags: []model.Tag{ - {Name: "strTag", Values: values}, - {Name: "intTag", Values: values}, - {Name: "strTag1", Values: values}, - {Name: "strTag2", Values: values}, - }}, - } -} diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go index 406e9bc4..9e4ebaa7 100644 --- a/banyand/stream/stream.go +++ b/banyand/stream/stream.go @@ -70,10 +70,12 @@ type stream struct { databaseSupplier schema.Supplier l *logger.Logger schema *databasev1.Stream + tagMap map[string]*databasev1.TagSpec + entityMap map[string]int name string group string - indexRules []*databasev1.IndexRule indexRuleLocators partition.IndexRuleLocator + indexRules []*databasev1.IndexRule shardNum uint32 } @@ -92,6 +94,16 @@ func (s *stream) Close() error { func (s *stream) parseSpec() { s.name, s.group = s.schema.GetMetadata().GetName(), s.schema.GetMetadata().GetGroup() s.indexRuleLocators, _ = partition.ParseIndexRuleLocators(s.schema.GetEntity(), s.schema.GetTagFamilies(), s.indexRules, false) + s.tagMap = make(map[string]*databasev1.TagSpec) + for _, tf := range s.schema.GetTagFamilies() { + for _, tag := range tf.GetTags() { + s.tagMap[tag.GetName()] = tag + } + } + s.entityMap = make(map[string]int) + for idx, entity := range s.schema.GetEntity().GetTagNames() { + s.entityMap[entity] = idx + 1 + } } type streamSpec struct { diff --git a/banyand/stream/trace.go b/banyand/stream/trace.go index ed428a67..5bcddecc 100644 --- a/banyand/stream/trace.go +++ b/banyand/stream/trace.go @@ -50,7 +50,7 @@ func (bc *blockCursor) String() string { bc.p.partMetadata.ID, bc.bm.seriesID, minTimestamp, maxTimestamp, bc.bm.count, humanize.Bytes(bc.bm.uncompressedSizeBytes)) } -func startBlockScanSpan(ctx context.Context, sids int, parts []*part, qr *queryResult) func() { +func startBlockScanSpan(ctx context.Context, sids int, parts []*part, qr *idxResult) func() { tracer := query.GetTracer(ctx) if tracer == nil { return func() {} diff --git a/banyand/stream/tstable_test.go b/banyand/stream/tstable_test.go index 42e2a6eb..3945f0bb 100644 --- a/banyand/stream/tstable_test.go +++ b/banyand/stream/tstable_test.go @@ -32,7 +32,6 @@ import ( "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/fs" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" - "github.com/apache/skywalking-banyandb/pkg/query/model" "github.com/apache/skywalking-banyandb/pkg/run" "github.com/apache/skywalking-banyandb/pkg/test" "github.com/apache/skywalking-banyandb/pkg/watcher" @@ -251,12 +250,6 @@ func Test_tstIter(t *testing.T) { }) } -var tagProjectionAll = []model.TagProjection{ - {Family: "arrTag", Names: []string{"strArrTag", "intArrTag"}}, - {Family: "binaryTag", Names: []string{"binaryTag"}}, - {Family: "singleTag", Names: []string{"strTag", "intTag", "strTag1", "strTag2"}}, -} - var esTS1 = &elements{ seriesIDs: []common.SeriesID{1, 2, 3}, timestamps: []int64{1, 1, 1}, diff --git a/pkg/pb/v1/value.go b/pkg/pb/v1/value.go index b9c6fa80..f13fc161 100644 --- a/pkg/pb/v1/value.go +++ b/pkg/pb/v1/value.go @@ -130,7 +130,7 @@ func marshalTagValue(dest []byte, tv *modelv1.TagValue) ([]byte, error) { case *modelv1.TagValue_Null: dest = marshalEntityValue(dest, nil) case *modelv1.TagValue_Str: - dest = marshalEntityValue(dest, convert.StringToBytes(tv.GetStr().Value)) + dest = marshalEntityValue(dest, []byte(tv.GetStr().Value)) case *modelv1.TagValue_Int: dest = marshalEntityValue(dest, encoding.Int64ToBytes(nil, tv.GetInt().Value)) case *modelv1.TagValue_BinaryData: diff --git a/pkg/query/logical/optimizer.go b/pkg/query/logical/optimizer.go index 471f78b8..0c793d8d 100644 --- a/pkg/query/logical/optimizer.go +++ b/pkg/query/logical/optimizer.go @@ -78,7 +78,7 @@ func (pdo PushDownOrder) Optimize(plan Plan) (Plan, error) { pdo.order.GetIndexRuleName(), pdo.order.GetSort()); err == nil && order != nil { v.Sort(order) } else { - return nil, err + return plan, err } } return plan, nil diff --git a/pkg/query/model/model.go b/pkg/query/model/model.go index 768e6fad..e40f2581 100644 --- a/pkg/query/model/model.go +++ b/pkg/query/model/model.go @@ -19,11 +19,13 @@ package model import ( + "container/heap" "context" "github.com/apache/skywalking-banyandb/api/common" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/pkg/index" + "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) @@ -96,6 +98,162 @@ type StreamResult struct { ElementIDs []uint64 TagFamilies []TagFamily SIDs []common.SeriesID + topN int + idx int + asc bool +} + +// NewStreamResult creates a new StreamResult. +func NewStreamResult(topN int, asc bool) *StreamResult { + return &StreamResult{ + topN: topN, + asc: asc, + Timestamps: make([]int64, 0, topN), + ElementIDs: make([]uint64, 0, topN), + TagFamilies: make([]TagFamily, 0, topN), + SIDs: make([]common.SeriesID, 0, topN), + } +} + +// Len returns the length of the StreamResult. +func (sr *StreamResult) Len() int { + return len(sr.Timestamps) +} + +// Reset resets the StreamResult. +func (sr *StreamResult) Reset() { + sr.Error = nil + sr.idx = 0 + sr.Timestamps = sr.Timestamps[:0] + sr.ElementIDs = sr.ElementIDs[:0] + sr.TagFamilies = sr.TagFamilies[:0] + sr.SIDs = sr.SIDs[:0] +} + +// CopyFrom copies the topN results from other to sr using tmp as a temporary result. +func (sr *StreamResult) CopyFrom(tmp, other *StreamResult) bool { + // Prepare a reusable tmp result + tmp.Reset() + tmp.topN = sr.topN + tmp.asc = sr.asc + + // Prepare heaps + sr.idx = 0 + other.idx = 0 + + h := &StreamResultHeap{asc: sr.asc} + heap.Init(h) + + if sr.Len() > 0 { + heap.Push(h, sr) + } + if other.Len() > 0 { + heap.Push(h, other) + } + + // Pop from heap to build tmp with topN + for h.Len() > 0 && tmp.Len() < tmp.topN { + res := heap.Pop(h).(*StreamResult) + tmp.CopySingleFrom(res) + res.idx++ + if res.idx < res.Len() { + heap.Push(h, res) + } + } + + // Copy tmp back to sr + sr.Reset() + sr.Timestamps = append(sr.Timestamps, tmp.Timestamps...) + sr.ElementIDs = append(sr.ElementIDs, tmp.ElementIDs...) + sr.SIDs = append(sr.SIDs, tmp.SIDs...) + sr.TagFamilies = append(sr.TagFamilies, tmp.TagFamilies...) + + return len(sr.Timestamps) >= sr.topN +} + +// CopySingleFrom copies a single result from other to sr. +func (sr *StreamResult) CopySingleFrom(other *StreamResult) { + sr.SIDs = append(sr.SIDs, other.SIDs[other.idx]) + sr.Timestamps = append(sr.Timestamps, other.Timestamps[other.idx]) + sr.ElementIDs = append(sr.ElementIDs, other.ElementIDs[other.idx]) + if len(sr.TagFamilies) < len(other.TagFamilies) { + for i := range other.TagFamilies { + tf := TagFamily{ + Name: other.TagFamilies[i].Name, + Tags: make([]Tag, len(other.TagFamilies[i].Tags)), + } + for j := range tf.Tags { + tf.Tags[j].Name = other.TagFamilies[i].Tags[j].Name + } + sr.TagFamilies = append(sr.TagFamilies, tf) + } + } + if len(sr.TagFamilies) != len(other.TagFamilies) { + logger.Panicf("tag family length mismatch: %d != %d", len(sr.TagFamilies), len(other.TagFamilies)) + } + for i := range sr.TagFamilies { + if len(sr.TagFamilies[i].Tags) != len(other.TagFamilies[i].Tags) { + logger.Panicf("tag length mismatch: %d != %d", len(sr.TagFamilies[i].Tags), len(other.TagFamilies[i].Tags)) + } + for j := range sr.TagFamilies[i].Tags { + sr.TagFamilies[i].Tags[j].Values = append(sr.TagFamilies[i].Tags[j].Values, other.TagFamilies[i].Tags[j].Values[other.idx]) + } + } +} + +// StreamResultHeap is a min-heap of StreamResult pointers. +type StreamResultHeap struct { + data []*StreamResult + asc bool +} + +func (h StreamResultHeap) Len() int { return len(h.data) } +func (h StreamResultHeap) Less(i, j int) bool { + if h.asc { + return h.data[i].Timestamps[h.data[i].idx] < h.data[j].Timestamps[h.data[j].idx] + } + return h.data[i].Timestamps[h.data[i].idx] > h.data[j].Timestamps[h.data[j].idx] +} +func (h StreamResultHeap) Swap(i, j int) { h.data[i], h.data[j] = h.data[j], h.data[i] } + +// Push pushes a StreamResult pointer to the heap. +func (h *StreamResultHeap) Push(x interface{}) { + h.data = append(h.data, x.(*StreamResult)) +} + +// Pop pops a StreamResult pointer from the heap. +func (h *StreamResultHeap) Pop() interface{} { + old := h.data + n := len(old) + x := old[n-1] + h.data = old[0 : n-1] + return x +} + +// MergeStreamResults merges multiple StreamResult slices into a single StreamResult. +func MergeStreamResults(results []*StreamResult, topN int, asc bool) *StreamResult { + h := &StreamResultHeap{asc: asc} + heap.Init(h) + + for _, result := range results { + if result.Len() > 0 { + result.idx = 0 + heap.Push(h, result) + } + } + + mergedResult := NewStreamResult(topN, asc) + + for h.Len() > 0 && mergedResult.Len() < topN { + sr := heap.Pop(h).(*StreamResult) + mergedResult.CopySingleFrom(sr) + sr.idx++ + if sr.idx < sr.Len() { + heap.Push(h, sr) + } + } + + return mergedResult } // StreamQueryResult is the result of a stream query. diff --git a/pkg/query/model/model_test.go b/pkg/query/model/model_test.go new file mode 100644 index 00000000..dd1750be --- /dev/null +++ b/pkg/query/model/model_test.go @@ -0,0 +1,947 @@ +package model + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/apache/skywalking-banyandb/api/common" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" +) + +func TestStreamResult_CopyFrom(t *testing.T) { + tests := []struct { + sr *StreamResult + other *StreamResult + name string + wantTS []int64 + wantTags []TagFamily + wantSIDs []common.SeriesID + topN int + wantLen int + asc bool + wantRet bool + }{ + { + name: "both empty", + sr: NewStreamResult(3, true), + other: NewStreamResult(3, true), + topN: 3, + asc: true, + wantLen: 0, + wantRet: false, + wantTS: []int64{}, + wantTags: []TagFamily{}, + wantSIDs: []common.SeriesID{}, + }, + { + name: "one with data, one empty", + sr: &StreamResult{ + asc: true, + topN: 3, + Timestamps: []int64{4, 2}, + ElementIDs: []uint64{100, 101}, + TagFamilies: []TagFamily{ + { + Name: "family1", + Tags: []Tag{{ + Name: "tag1", + Values: []*modelv1.TagValue{ + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value1"}, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value2"}, + }, + }, + }, + }}, + }, + { + Name: "family2", + Tags: []Tag{{ + Name: "tag1", + Values: []*modelv1.TagValue{ + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value1"}, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value2"}, + }, + }, + }, + }}, + }, + }, + SIDs: []common.SeriesID{1, 2}, + }, + other: NewStreamResult(3, true), + topN: 3, + asc: true, + wantLen: 2, + wantRet: false, + wantTS: []int64{4, 2}, + wantTags: []TagFamily{ + { + Name: "family1", + Tags: []Tag{{ + Name: "tag1", + Values: []*modelv1.TagValue{ + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value1"}, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value2"}, + }, + }, + }, + }}, + }, + { + Name: "family2", + Tags: []Tag{{ + Name: "tag1", + Values: []*modelv1.TagValue{ + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value1"}, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value2"}, + }, + }, + }, + }}, + }, + }, + wantSIDs: []common.SeriesID{1, 2}, + }, + { + name: "one empty, one with data", + sr: NewStreamResult(3, true), + other: &StreamResult{ + asc: true, + topN: 3, + Timestamps: []int64{4, 2}, + ElementIDs: []uint64{100, 101}, + TagFamilies: []TagFamily{ + { + Name: "family1", + Tags: []Tag{{ + Name: "tag1", + Values: []*modelv1.TagValue{ + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value1"}, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value2"}, + }, + }, + }, + }}, + }, + { + Name: "family2", + Tags: []Tag{{ + Name: "tag1", + Values: []*modelv1.TagValue{ + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value1"}, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value2"}, + }, + }, + }, + }}, + }, + }, + SIDs: []common.SeriesID{1, 2}, + }, + topN: 3, + asc: true, + wantLen: 2, + wantRet: false, + wantTS: []int64{4, 2}, + wantTags: []TagFamily{ + { + Name: "family1", + Tags: []Tag{{ + Name: "tag1", + Values: []*modelv1.TagValue{ + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value1"}, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value2"}, + }, + }, + }, + }}, + }, + { + Name: "family2", + Tags: []Tag{{ + Name: "tag1", + Values: []*modelv1.TagValue{ + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value1"}, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value2"}, + }, + }, + }, + }}, + }, + }, + wantSIDs: []common.SeriesID{1, 2}, + }, + { + name: "both have data, combined < topN", + sr: &StreamResult{ + asc: true, + topN: 5, + Timestamps: []int64{10, 30}, + ElementIDs: []uint64{1, 2}, + TagFamilies: []TagFamily{ + { + Name: "family1", + Tags: []Tag{{ + Name: "tag1", + Values: []*modelv1.TagValue{ + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value11"}, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value12"}, + }, + }, + }, + }}, + }, + { + Name: "family2", + Tags: []Tag{{ + Name: "tag1", + Values: []*modelv1.TagValue{ + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value21"}, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value22"}, + }, + }, + }, + }}, + }, + }, + SIDs: []common.SeriesID{1, 2}, + }, + other: &StreamResult{ + asc: true, + topN: 5, + Timestamps: []int64{20}, + ElementIDs: []uint64{3}, + TagFamilies: []TagFamily{ + { + Name: "family1", + Tags: []Tag{{ + Name: "tag1", + Values: []*modelv1.TagValue{ + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value13"}, + }, + }, + }, + }}, + }, + { + Name: "family2", + Tags: []Tag{{ + Name: "tag1", + Values: []*modelv1.TagValue{ + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value23"}, + }, + }, + }, + }}, + }, + }, + SIDs: []common.SeriesID{3}, + }, + topN: 5, + asc: true, + wantLen: 3, + wantRet: false, + wantTS: []int64{10, 20, 30}, + wantTags: []TagFamily{ + { + Name: "family1", + Tags: []Tag{{ + Name: "tag1", + Values: []*modelv1.TagValue{ + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value11"}, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value13"}, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value12"}, + }, + }, + }, + }}, + }, + { + Name: "family2", + Tags: []Tag{{ + Name: "tag1", + Values: []*modelv1.TagValue{ + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value21"}, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value23"}, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value22"}, + }, + }, + }, + }}, + }, + }, + wantSIDs: []common.SeriesID{1, 3, 2}, + }, + { + name: "both have data, combined >= topN", + sr: &StreamResult{ + asc: false, + topN: 3, + Timestamps: []int64{30, 20, 10}, + ElementIDs: []uint64{5, 6, 7}, + TagFamilies: []TagFamily{ + { + Name: "family1", + Tags: []Tag{{ + Name: "tag1", + Values: []*modelv1.TagValue{ + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value11"}, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value12"}, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value13"}, + }, + }, + }, + }}, + }, + { + Name: "family2", + Tags: []Tag{{ + Name: "tag1", + Values: []*modelv1.TagValue{ + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value21"}, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value22"}, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value23"}, + }, + }, + }, + }}, + }, + }, + SIDs: []common.SeriesID{1, 2, 3}, + }, + other: &StreamResult{ + asc: false, + topN: 3, + Timestamps: []int64{50, 40}, + ElementIDs: []uint64{8, 9}, + TagFamilies: []TagFamily{ + { + Name: "family1", + Tags: []Tag{{ + Name: "tag1", + Values: []*modelv1.TagValue{ + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value14"}, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value15"}, + }, + }, + }, + }}, + }, + { + Name: "family2", + Tags: []Tag{{ + Name: "tag1", + Values: []*modelv1.TagValue{ + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value24"}, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value25"}, + }, + }, + }, + }}, + }, + }, + SIDs: []common.SeriesID{4, 5}, + }, + topN: 3, + asc: false, + wantLen: 3, + wantRet: true, + wantTS: []int64{50, 40, 30}, + wantTags: []TagFamily{ + { + Name: "family1", + Tags: []Tag{{ + Name: "tag1", + Values: []*modelv1.TagValue{ + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value14"}, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value15"}, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value11"}, + }, + }, + }, + }}, + }, + { + Name: "family2", + Tags: []Tag{{ + Name: "tag1", + Values: []*modelv1.TagValue{ + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value24"}, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value25"}, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value21"}, + }, + }, + }, + }}, + }, + }, + wantSIDs: []common.SeriesID{4, 5, 1}, + }, + } + tmp := &StreamResult{} + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := tt.sr.CopyFrom(tmp, tt.other) + assert.Equal(t, tt.wantLen, tt.sr.Len(), "unexpected length") + assert.Equal(t, tt.wantRet, got, "unexpected return value") + assert.Equal(t, tt.wantTS, tt.sr.Timestamps, "unexpected timestamps") + assert.Equal(t, tt.wantTags, tt.sr.TagFamilies, "unexpected tag families") + assert.Equal(t, tt.wantSIDs, tt.sr.SIDs, "unexpected SIDs") + }) + } +} + +func TestMergeStreamResults(t *testing.T) { + tests := []struct { + name string + results []*StreamResult + wantTS []int64 + wantTags []TagFamily + wantSIDs []common.SeriesID + topN int + asc bool + }{ + { + name: "all empty", + results: []*StreamResult{NewStreamResult(3, true), NewStreamResult(3, true)}, + topN: 3, + asc: true, + wantTS: []int64{}, + wantTags: []TagFamily{}, + wantSIDs: []common.SeriesID{}, + }, + { + name: "one with data, one empty", + results: []*StreamResult{ + { + asc: true, + topN: 3, + Timestamps: []int64{2, 4}, + ElementIDs: []uint64{100, 101}, + TagFamilies: []TagFamily{ + { + Name: "family1", + Tags: []Tag{{ + Name: "tag1", + Values: []*modelv1.TagValue{ + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value1"}, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value2"}, + }, + }, + }, + }}, + }, + { + Name: "family2", + Tags: []Tag{{ + Name: "tag1", + Values: []*modelv1.TagValue{ + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value1"}, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value2"}, + }, + }, + }, + }}, + }, + }, + SIDs: []common.SeriesID{1, 2}, + }, + NewStreamResult(3, true), + }, + topN: 3, + asc: true, + wantTS: []int64{2, 4}, + wantTags: []TagFamily{ + { + Name: "family1", + Tags: []Tag{{ + Name: "tag1", + Values: []*modelv1.TagValue{ + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value1"}, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value2"}, + }, + }, + }, + }}, + }, + { + Name: "family2", + Tags: []Tag{{ + Name: "tag1", + Values: []*modelv1.TagValue{ + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value1"}, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value2"}, + }, + }, + }, + }}, + }, + }, + wantSIDs: []common.SeriesID{1, 2}, + }, + { + name: "both have data, combined < topN", + results: []*StreamResult{ + { + asc: true, + topN: 5, + Timestamps: []int64{10, 30}, + ElementIDs: []uint64{1, 2}, + TagFamilies: []TagFamily{ + { + Name: "family1", + Tags: []Tag{{ + Name: "tag1", + Values: []*modelv1.TagValue{ + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value11"}, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value12"}, + }, + }, + }, + }}, + }, + { + Name: "family2", + Tags: []Tag{{ + Name: "tag1", + Values: []*modelv1.TagValue{ + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value21"}, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value22"}, + }, + }, + }, + }}, + }, + }, + SIDs: []common.SeriesID{1, 2}, + }, + { + asc: true, + topN: 5, + Timestamps: []int64{20}, + ElementIDs: []uint64{3}, + TagFamilies: []TagFamily{ + { + Name: "family1", + Tags: []Tag{{ + Name: "tag1", + Values: []*modelv1.TagValue{ + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value13"}, + }, + }, + }, + }}, + }, + { + Name: "family2", + Tags: []Tag{{ + Name: "tag1", + Values: []*modelv1.TagValue{ + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value23"}, + }, + }, + }, + }}, + }, + }, + SIDs: []common.SeriesID{3}, + }, + }, + topN: 5, + asc: true, + wantTS: []int64{10, 20, 30}, + wantTags: []TagFamily{ + { + Name: "family1", + Tags: []Tag{{ + Name: "tag1", + Values: []*modelv1.TagValue{ + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value11"}, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value13"}, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value12"}, + }, + }, + }, + }}, + }, + { + Name: "family2", + Tags: []Tag{{ + Name: "tag1", + Values: []*modelv1.TagValue{ + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value21"}, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value23"}, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value22"}, + }, + }, + }, + }}, + }, + }, + wantSIDs: []common.SeriesID{1, 3, 2}, + }, + { + name: "both have data, combined >= topN", + results: []*StreamResult{ + { + asc: false, + topN: 3, + Timestamps: []int64{30, 20, 10}, + ElementIDs: []uint64{5, 6, 7}, + TagFamilies: []TagFamily{ + { + Name: "family1", + Tags: []Tag{{ + Name: "tag1", + Values: []*modelv1.TagValue{ + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value11"}, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value12"}, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value13"}, + }, + }, + }, + }}, + }, + { + Name: "family2", + Tags: []Tag{{ + Name: "tag1", + Values: []*modelv1.TagValue{ + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value21"}, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value22"}, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value23"}, + }, + }, + }, + }}, + }, + }, + SIDs: []common.SeriesID{1, 2, 3}, + }, + { + asc: false, + topN: 3, + Timestamps: []int64{50, 40}, + ElementIDs: []uint64{8, 9}, + TagFamilies: []TagFamily{ + { + Name: "family1", + Tags: []Tag{{ + Name: "tag1", + Values: []*modelv1.TagValue{ + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value14"}, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value15"}, + }, + }, + }, + }}, + }, + { + Name: "family2", + Tags: []Tag{{ + Name: "tag1", + Values: []*modelv1.TagValue{ + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value24"}, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value25"}, + }, + }, + }, + }}, + }, + }, + SIDs: []common.SeriesID{4, 5}, + }, + }, + topN: 3, + asc: false, + wantTS: []int64{50, 40, 30}, + wantTags: []TagFamily{ + { + Name: "family1", + Tags: []Tag{{ + Name: "tag1", + Values: []*modelv1.TagValue{ + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value14"}, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value15"}, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value11"}, + }, + }, + }, + }}, + }, + { + Name: "family2", + Tags: []Tag{{ + Name: "tag1", + Values: []*modelv1.TagValue{ + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value24"}, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value25"}, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "value21"}, + }, + }, + }, + }}, + }, + }, + wantSIDs: []common.SeriesID{4, 5, 1}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := MergeStreamResults(tt.results, tt.topN, tt.asc) + assert.Equal(t, tt.wantTS, got.Timestamps, "unexpected timestamps") + assert.Equal(t, tt.wantTags, got.TagFamilies, "unexpected tag families") + assert.Equal(t, tt.wantSIDs, got.SIDs, "unexpected SIDs") + }) + } +}
