This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch measure-index in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit a3c413af3ec4138ac4d7491e1179a25d2615f5f8 Author: Gao Hongtao <[email protected]> AuthorDate: Wed Jul 31 13:10:10 2024 +0800 Move indexed values in a measure from data files to index files Signed-off-by: Gao Hongtao <[email protected]> --- CHANGES.md | 1 + banyand/internal/storage/index.go | 110 ++++++---- banyand/internal/storage/index_test.go | 2 +- banyand/internal/storage/storage.go | 16 +- banyand/measure/block.go | 24 +-- banyand/measure/measure.go | 25 +-- banyand/measure/query.go | 168 ++++++++------- banyand/measure/write.go | 7 +- banyand/stream/index.go | 2 +- banyand/stream/query.go | 10 +- banyand/stream/stream.go | 2 +- pkg/index/index.go | 34 +-- pkg/index/inverted/inverted.go | 71 +++--- pkg/index/inverted/inverted_series.go | 41 ++-- pkg/index/inverted/inverted_series_test.go | 251 +++++++++++++++++----- pkg/index/inverted/inverted_test.go | 9 +- pkg/index/inverted/sort.go | 44 ++-- pkg/index/inverted/sort_test.go | 4 +- pkg/partition/index.go | 29 ++- pkg/pb/v1/value.go | 19 ++ test/cases/measure/data/input/entity.yaml | 2 +- test/cases/measure/data/input/entity_in.yaml | 2 +- test/cases/measure/data/input/entity_service.yaml | 2 +- test/cases/measure/data/input/no_field.yaml | 2 +- test/cases/measure/data/want/entity.yaml | 26 ++- test/cases/measure/data/want/entity_in.yaml | 17 +- test/cases/measure/data/want/entity_service.yaml | 26 ++- test/cases/measure/data/want/no_field.yaml | 18 +- 28 files changed, 626 insertions(+), 338 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 3cc60064..916e7393 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -9,6 +9,7 @@ Release Notes. - Bump up the version of the file system to 1.1.0 which is not compatible with the previous version. - Move the series index into segment. - Swap the segment and the shard. +- Move indexed values in a measure from data files to index files. ### Features diff --git a/banyand/internal/storage/index.go b/banyand/internal/storage/index.go index 5abb9e91..b8cd31cb 100644 --- a/banyand/internal/storage/index.go +++ b/banyand/internal/storage/index.go @@ -20,6 +20,7 @@ package storage import ( "context" "path" + "strings" "github.com/pkg/errors" "go.uber.org/multierr" @@ -33,7 +34,6 @@ import ( pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/query" "github.com/apache/skywalking-banyandb/pkg/query/logical" - "github.com/apache/skywalking-banyandb/pkg/query/model" ) func (s *segment[T, O]) IndexDB() IndexDB { @@ -41,7 +41,8 @@ func (s *segment[T, O]) IndexDB() IndexDB { } func (s *segment[T, O]) Lookup(ctx context.Context, series []*pbv1.Series) (pbv1.SeriesList, error) { - return s.index.searchPrimary(ctx, series) + sl, _, err := s.index.searchPrimary(ctx, series, nil) + return sl, err } type seriesIndex struct { @@ -72,38 +73,38 @@ func (s *seriesIndex) Write(docs index.Documents) error { var rangeOpts = index.RangeOpts{} -func (s *seriesIndex) searchPrimary(ctx context.Context, series []*pbv1.Series) (sl pbv1.SeriesList, err error) { +func (s *seriesIndex) searchPrimary(ctx context.Context, series []*pbv1.Series, projection []index.FieldKey) (sl pbv1.SeriesList, fields FieldResultList, err error) { seriesMatchers := make([]index.SeriesMatcher, len(series)) for i := range series { seriesMatchers[i], err = convertEntityValuesToSeriesMatcher(series[i]) if err != nil { - return nil, err + return nil, nil, err } } tracer := query.GetTracer(ctx) - var span *query.Span if tracer != nil { - span, _ = tracer.StartSpan(ctx, "seriesIndex.searchPrimary") + span, _ := tracer.StartSpan(ctx, "seriesIndex.searchPrimary") span.Tagf("matchers", "%v", seriesMatchers) defer func() { + span.Tagf("matched", "%d", len(sl)) + if len(fields) > 0 { + span.Tagf("field_length", "%d", len(fields[0])) + } if err != nil { span.Error(err) } span.Stop() }() } - ss, err := s.store.Search(ctx, seriesMatchers) + ss, err := s.store.Search(ctx, seriesMatchers, projection) if err != nil { - return nil, err + return nil, nil, err } - result, err := convertIndexSeriesToSeriesList(ss) + sl, fields, err = convertIndexSeriesToSeriesList(ss, len(projection) > 0) if err != nil { - return nil, errors.WithMessagef(err, "failed to convert index series to series list, matchers: %v, matched: %d", seriesMatchers, len(ss)) + return nil, nil, errors.WithMessagef(err, "failed to convert index series to series list, matchers: %v, matched: %d", seriesMatchers, len(ss)) } - if span != nil { - span.Tagf("matched", "%d", len(result)) - } - return result, nil + return sl, fields, nil } var emptySeriesMatcher = index.SeriesMatcher{} @@ -161,20 +162,27 @@ func convertEntityValuesToSeriesMatcher(series *pbv1.Series) (index.SeriesMatche }, nil } -func convertIndexSeriesToSeriesList(indexSeries []index.Series) (pbv1.SeriesList, error) { +func convertIndexSeriesToSeriesList(indexSeries []index.SeriesDocument, hasFields bool) (pbv1.SeriesList, FieldResultList, error) { seriesList := make(pbv1.SeriesList, 0, len(indexSeries)) + var fields FieldResultList + if hasFields { + fields = make(FieldResultList, 0, len(indexSeries)) + } for _, s := range indexSeries { var series pbv1.Series - series.ID = s.ID - if err := series.Unmarshal(s.EntityValues); err != nil { - return nil, errors.WithMessagef(err, "failed to unmarshal series: %s", s.EntityValues) + series.ID = s.Key.ID + if err := series.Unmarshal(s.Key.EntityValues); err != nil { + return nil, nil, errors.WithMessagef(err, "failed to unmarshal series: %s", s.Key.EntityValues) } seriesList = append(seriesList, &series) + if fields != nil { + fields = append(fields, s.Fields) + } } - return seriesList, nil + return seriesList, fields, nil } -func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, filter index.Filter, order *model.OrderBy, preloadSize int) (sl pbv1.SeriesList, err error) { +func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, opts IndexSearchOpts) (sl pbv1.SeriesList, frl FieldResultList, err error) { tracer := query.GetTracer(ctx) if tracer != nil { var span *query.Span @@ -186,18 +194,30 @@ func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, filter span.Stop() }() } - seriesList, err := s.searchPrimary(ctx, series) + seriesList, fieldResultList, err := s.searchPrimary(ctx, series, opts.Projection) if err != nil { - return nil, err + return nil, nil, err } pl := seriesList.ToList() - if filter != nil && filter != logical.ENode { + if opts.Filter != nil && opts.Filter != logical.ENode { var plFilter posting.List func() { if tracer != nil { span, _ := tracer.StartSpan(ctx, "filter") - span.Tag("exp", filter.String()) + span.Tag("exp", opts.Filter.String()) + var projectionStrBuilder strings.Builder + if len(opts.Projection) > 0 { + projectionStrBuilder.WriteString("[") + for i, p := range opts.Projection { + if i > 0 { + projectionStrBuilder.WriteString(", ") + } + projectionStrBuilder.WriteRune(rune(p.IndexRuleID)) + } + projectionStrBuilder.WriteString("]") + span.Tagf("projection", "%s", projectionStrBuilder.String()) + } defer func() { if err != nil { span.Error(err) @@ -208,7 +228,7 @@ func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, filter span.Stop() }() } - if plFilter, err = filter.Execute(func(_ databasev1.IndexRule_Type) (index.Searcher, error) { + if plFilter, err = opts.Filter.Execute(func(_ databasev1.IndexRule_Type) (index.Searcher, error) { return s.store, nil }, 0); err != nil { return @@ -219,21 +239,22 @@ func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, filter err = pl.Intersect(plFilter) }() if err != nil { - return nil, err + return nil, nil, err } } - if order == nil || order.Index == nil { - return filterSeriesList(seriesList, pl), nil + if opts.Order == nil || opts.Order.Index == nil { + sl, frl = filterSeriesList(seriesList, fieldResultList, pl) + return sl, frl, nil } fieldKey := index.FieldKey{ - IndexRuleID: order.Index.GetMetadata().Id, + IndexRuleID: opts.Order.Index.GetMetadata().Id, } var span *query.Span if tracer != nil { span, _ = tracer.StartSpan(ctx, "sort") - span.Tagf("preload", "%d", preloadSize) + span.Tagf("preload", "%d", opts.PreloadSize) defer func() { if err != nil { span.Error(err) @@ -241,15 +262,17 @@ func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, filter span.Stop() }() } - iter, err := s.store.Iterator(fieldKey, rangeOpts, order.Sort, preloadSize) + iter, err := s.store.Iterator(fieldKey, rangeOpts, + opts.Order.Sort, opts.PreloadSize) if err != nil { - return nil, err + return nil, nil, err } defer func() { err = multierr.Append(err, iter.Close()) }() var sortedSeriesList pbv1.SeriesList + var sortedFieldResultList FieldResultList var r int for iter.Next() { r++ @@ -257,36 +280,45 @@ func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, filter if !pl.Contains(docID) { continue } - sortedSeriesList = appendSeriesList(sortedSeriesList, seriesList, common.SeriesID(docID)) + sortedSeriesList, sortedFieldResultList = appendSeriesList( + sortedSeriesList, seriesList, + sortedFieldResultList, fieldResultList, + common.SeriesID(docID)) if err != nil { - return nil, err + return nil, nil, err } } if span != nil { span.Tagf("rounds", "%d", r) span.Tagf("size", "%d", len(sortedSeriesList)) } - return sortedSeriesList, err + return sortedSeriesList, sortedFieldResultList, err } -func filterSeriesList(seriesList pbv1.SeriesList, filter posting.List) pbv1.SeriesList { +func filterSeriesList(seriesList pbv1.SeriesList, fieldResultList FieldResultList, filter posting.List) (pbv1.SeriesList, FieldResultList) { for i := 0; i < len(seriesList); i++ { if !filter.Contains(uint64(seriesList[i].ID)) { seriesList = append(seriesList[:i], seriesList[i+1:]...) + if fieldResultList != nil { + fieldResultList = append(fieldResultList[:i], fieldResultList[i+1:]...) + } i-- } } - return seriesList + return seriesList, fieldResultList } -func appendSeriesList(dest, src pbv1.SeriesList, target common.SeriesID) pbv1.SeriesList { +func appendSeriesList(dest, src pbv1.SeriesList, destFRL, srcFRL FieldResultList, target common.SeriesID) (pbv1.SeriesList, FieldResultList) { for i := 0; i < len(src); i++ { if target == src[i].ID { dest = append(dest, src[i]) + if srcFRL != nil { + destFRL = append(destFRL, srcFRL[i]) + } break } } - return dest + return dest, destFRL } func (s *seriesIndex) Close() error { diff --git a/banyand/internal/storage/index_test.go b/banyand/internal/storage/index_test.go index e28a6f54..0e61b623 100644 --- a/banyand/internal/storage/index_test.go +++ b/banyand/internal/storage/index_test.go @@ -161,7 +161,7 @@ func TestSeriesIndex_Primary(t *testing.T) { seriesQuery.EntityValues = tt.entityValues[i] seriesQueries = append(seriesQueries, seriesQuery) } - sl, err := si.searchPrimary(ctx, seriesQueries) + sl, _, err := si.searchPrimary(ctx, seriesQueries, nil) require.NoError(t, err) require.Equal(t, len(tt.entityValues), len(sl)) assert.Equal(t, tt.subject, sl[0].Subject) diff --git a/banyand/internal/storage/storage.go b/banyand/internal/storage/storage.go index b05fb785..1ccf03d7 100644 --- a/banyand/internal/storage/storage.go +++ b/banyand/internal/storage/storage.go @@ -64,10 +64,24 @@ var ( // SupplyTSDB allows getting a tsdb's runtime. type SupplyTSDB[T TSTable] func() T +// IndexSearchOpts is the options for searching index. +type IndexSearchOpts struct { + Filter index.Filter + Order *model.OrderBy + Projection []index.FieldKey + PreloadSize int +} + +// FieldResult is the result of a field. +type FieldResult map[string][]byte + +// FieldResultList is a list of FieldResult. +type FieldResultList []FieldResult + // IndexDB is the interface of index database. type IndexDB interface { Write(docs index.Documents) error - Search(ctx context.Context, series []*pbv1.Series, filter index.Filter, order *model.OrderBy, preloadSize int) (pbv1.SeriesList, error) + Search(ctx context.Context, series []*pbv1.Series, opts IndexSearchOpts) (pbv1.SeriesList, FieldResultList, error) } // TSDB allows listing and getting shard details. diff --git a/banyand/measure/block.go b/banyand/measure/block.go index a758fe3c..93376790 100644 --- a/banyand/measure/block.go +++ b/banyand/measure/block.go @@ -462,7 +462,7 @@ func (bc *blockCursor) init(p *part, bm *blockMetadata, queryOpts queryOptions) bc.fieldProjection = queryOpts.FieldProjection } -func (bc *blockCursor) copyAllTo(r *model.MeasureResult, entityValuesAll map[common.SeriesID]map[string]*modelv1.TagValue, +func (bc *blockCursor) copyAllTo(r *model.MeasureResult, storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue, tagProjection []model.TagProjection, desc bool, ) { var idx, offset int @@ -484,9 +484,9 @@ func (bc *blockCursor) copyAllTo(r *model.MeasureResult, entityValuesAll map[com slices.Reverse(r.Timestamps) slices.Reverse(r.Versions) } - var entityValues map[string]*modelv1.TagValue - if entityValuesAll != nil { - entityValues = entityValuesAll[r.SID] + var indexValue map[string]*modelv1.TagValue + if storedIndexValue != nil { + indexValue = storedIndexValue[r.SID] } OUTER: for _, tp := range tagProjection { @@ -498,10 +498,10 @@ OUTER: t := model.Tag{ Name: tagName, } - if entityValues != nil && entityValues[tagName] != nil { + if indexValue != nil && indexValue[tagName] != nil { t.Values = make([]*modelv1.TagValue, size) for i := 0; i < size; i++ { - t.Values[i] = entityValues[tagName] + t.Values[i] = indexValue[tagName] } tf.Tags = append(tf.Tags, t) continue @@ -564,15 +564,15 @@ OUTER: } } -func (bc *blockCursor) copyTo(r *model.MeasureResult, entityValuesAll map[common.SeriesID]map[string]*modelv1.TagValue, +func (bc *blockCursor) copyTo(r *model.MeasureResult, storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue, tagProjection []model.TagProjection, ) { r.SID = bc.bm.seriesID r.Timestamps = append(r.Timestamps, bc.timestamps[bc.idx]) r.Versions = append(r.Versions, bc.versions[bc.idx]) - var entityValues map[string]*modelv1.TagValue - if entityValuesAll != nil { - entityValues = entityValuesAll[r.SID] + var indexValue map[string]*modelv1.TagValue + if storedIndexValue != nil { + indexValue = storedIndexValue[r.SID] } if len(r.TagFamilies) == 0 { for _, tp := range tagProjection { @@ -593,8 +593,8 @@ func (bc *blockCursor) copyTo(r *model.MeasureResult, entityValuesAll map[common var cf *columnFamily for j := range r.TagFamilies[i].Tags { tagName := r.TagFamilies[i].Tags[j].Name - if entityValues != nil && entityValues[tagName] != nil { - r.TagFamilies[i].Tags[j].Values = append(r.TagFamilies[i].Tags[j].Values, entityValues[tagName]) + if indexValue != nil && indexValue[tagName] != nil { + r.TagFamilies[i].Tags[j].Values = append(r.TagFamilies[i].Tags[j].Values, indexValue[tagName]) continue } if cf == nil { diff --git a/banyand/measure/measure.go b/banyand/measure/measure.go index c05c4c99..f278735e 100644 --- a/banyand/measure/measure.go +++ b/banyand/measure/measure.go @@ -50,17 +50,18 @@ type option struct { } type measure struct { - databaseSupplier schema.Supplier - l *logger.Logger - schema *databasev1.Measure - processorManager *topNProcessorManager - name string - group string - indexRules []*databasev1.IndexRule - indexRuleLocators partition.IndexRuleLocator - topNAggregations []*databasev1.TopNAggregation - interval time.Duration - shardNum uint32 + databaseSupplier schema.Supplier + l *logger.Logger + schema *databasev1.Measure + processorManager *topNProcessorManager + name string + group string + indexRules []*databasev1.IndexRule + indexRuleLocators partition.IndexRuleLocator + fieldIndexLocation partition.FieldIndexLocation + topNAggregations []*databasev1.TopNAggregation + interval time.Duration + shardNum uint32 } func (s *measure) startSteamingManager(pipeline queue.Queue) error { @@ -102,7 +103,7 @@ func (s *measure) parseSpec() (err error) { if s.schema.Interval != "" { s.interval, err = timestamp.ParseDuration(s.schema.Interval) } - s.indexRuleLocators = partition.ParseIndexRuleLocators(s.schema.GetEntity(), s.schema.GetTagFamilies(), s.indexRules) + s.indexRuleLocators, s.fieldIndexLocation = partition.ParseIndexRuleLocators(s.schema.GetEntity(), s.schema.GetTagFamilies(), s.indexRules) return err } diff --git a/banyand/measure/query.go b/banyand/measure/query.go index 19645d68..ae37521a 100644 --- a/banyand/measure/query.go +++ b/banyand/measure/query.go @@ -32,6 +32,7 @@ import ( modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/internal/storage" "github.com/apache/skywalking-banyandb/pkg/convert" + "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring" "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" @@ -96,17 +97,16 @@ func (s *measure) Query(ctx context.Context, mqo model.MeasureQueryOptions) (mqr } }() - sl, tables, err := s.searchSeriesList(ctx, series, mqo, result.segments) + sids, tables, storedIndexValue, newTagProjection, err := s.searchSeriesList(ctx, series, mqo, result.segments) if err != nil { return nil, err } - if len(sl) < 1 { + if len(sids) < 1 { return &result, nil } - var sids []common.SeriesID - for i := range sl { - sids = append(sids, sl[i].ID) - } + result.tagProjection = mqo.TagProjection + mqo.TagProjection = newTagProjection + result.storedIndexValue = storedIndexValue var parts []*part qo := queryOptions{ MeasureQueryOptions: mqo, @@ -127,7 +127,7 @@ func (s *measure) Query(ctx context.Context, mqo model.MeasureQueryOptions) (mqr result.snapshots = append(result.snapshots, s) } - if err = s.searchBlocks(ctx, &result, sl, sids, parts, qo); err != nil { + if err = s.searchBlocks(ctx, &result, sids, parts, qo); err != nil { return nil, err } @@ -147,28 +147,97 @@ func (s *measure) Query(ctx context.Context, mqo model.MeasureQueryOptions) (mqr return &result, nil } +type tagNameWithType struct { + name string + typ pbv1.ValueType +} + func (s *measure) searchSeriesList(ctx context.Context, series []*pbv1.Series, mqo model.MeasureQueryOptions, segments []storage.Segment[*tsTable, option], -) (sl pbv1.SeriesList, tables []*tsTable, err error) { +) (sl []common.SeriesID, tables []*tsTable, storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue, + newTagProjection []model.TagProjection, err error, +) { + var indexProjection []index.FieldKey + fieldToValueType := make(map[string]tagNameWithType) + var projectedEntityOffsets map[string]int + newTagProjection = make([]model.TagProjection, 0) + for _, tp := range mqo.TagProjection { + var tagProjection model.TagProjection + TAG: + for _, n := range tp.Names { + for i := range s.schema.GetEntity().GetTagNames() { + if n == s.schema.GetEntity().GetTagNames()[i] { + if projectedEntityOffsets == nil { + projectedEntityOffsets = make(map[string]int) + } + projectedEntityOffsets[n] = i + continue TAG + } + } + if fields, ok := s.fieldIndexLocation[tp.Family]; ok { + if field, ok := fields[n]; ok { + indexProjection = append(indexProjection, field.Key) + fieldToValueType[field.Key.Marshal()] = tagNameWithType{ + name: n, + typ: field.Type, + } + continue TAG + } + } + tagProjection.Family = tp.Family + tagProjection.Names = append(tagProjection.Names, n) + } + if tagProjection.Family != "" { + newTagProjection = append(newTagProjection, tagProjection) + } + } seriesFilter := roaring.NewPostingList() for i := range segments { - tables = append(tables, segments[i].Tables()...) - sll, err := segments[i].IndexDB().Search(ctx, series, mqo.Filter, mqo.Order, preloadSize) + sll, fieldResultList, err := segments[i].IndexDB().Search(ctx, series, storage.IndexSearchOpts{ + Filter: mqo.Filter, + Order: mqo.Order, + PreloadSize: preloadSize, + Projection: indexProjection, + }) if err != nil { - return nil, nil, err + return nil, nil, nil, nil, err + } + if len(sll) > 0 { + tables = append(tables, segments[i].Tables()...) } for j := range sll { if seriesFilter.Contains(uint64(sll[j].ID)) { continue } - sl = append(sl, sll[j]) seriesFilter.Insert(uint64(sll[j].ID)) + sl = append(sl, sll[j].ID) + if projectedEntityOffsets == nil && fieldResultList == nil { + continue + } + if storedIndexValue == nil { + storedIndexValue = make(map[common.SeriesID]map[string]*modelv1.TagValue) + } + tagValues := make(map[string]*modelv1.TagValue) + storedIndexValue[sll[j].ID] = tagValues + for name, offset := range projectedEntityOffsets { + tagValues[name] = sll[j].EntityValues[offset] + } + if fieldResultList == nil { + continue + } + for f, v := range fieldResultList[j] { + if tnt, ok := fieldToValueType[f]; ok { + tagValues[tnt.name] = mustDecodeTagValue(tnt.typ, v) + } else { + logger.Panicf("unknown field %s not found in fieldToValueType", f) + } + } } } - return sl, tables, nil + return sl, tables, storedIndexValue, newTagProjection, nil } -func (s *measure) searchBlocks(ctx context.Context, result *queryResult, sl pbv1.SeriesList, sids []common.SeriesID, parts []*part, qo queryOptions) error { +func (s *measure) searchBlocks(ctx context.Context, result *queryResult, sids []common.SeriesID, parts []*part, qo queryOptions) error { bma := generateBlockMetadataArray() defer releaseBlockMetadataArray(bma) defFn := startBlockScanSpan(ctx, len(sids), parts, result) @@ -183,26 +252,9 @@ func (s *measure) searchBlocks(ctx context.Context, result *queryResult, sl pbv1 if tstIter.Error() != nil { return fmt.Errorf("cannot init tstIter: %w", tstIter.Error()) } - projectedEntityOffsets, tagProjectionOnPart := s.parseTagProjection(qo, result) - result.tagProjection = qo.TagProjection - qo.TagProjection = tagProjectionOnPart - for tstIter.nextBlock() { bc := generateBlockCursor() p := tstIter.piHeap[0] - - seriesID := p.curBlock.seriesID - if result.entityValues != nil && result.entityValues[seriesID] == nil { - for i := range sl { - if sl[i].ID == seriesID { - tag := make(map[string]*modelv1.TagValue) - for name, offset := range projectedEntityOffsets { - tag[name] = sl[i].EntityValues[offset] - } - result.entityValues[seriesID] = tag - } - } - } bc.init(p.p, p.curBlock, qo) result.data = append(result.data, bc) } @@ -216,34 +268,6 @@ func (s *measure) searchBlocks(ctx context.Context, result *queryResult, sl pbv1 return nil } -func (s *measure) parseTagProjection(qo queryOptions, result *queryResult) (projectedEntityOffsets map[string]int, tagProjectionOnPart []model.TagProjection) { - projectedEntityOffsets = make(map[string]int) - for i := range qo.TagProjection { - var found bool - for j := range qo.TagProjection[i].Names { - for k := range s.schema.GetEntity().GetTagNames() { - if qo.TagProjection[i].Names[j] == s.schema.GetEntity().GetTagNames()[k] { - projectedEntityOffsets[qo.TagProjection[i].Names[j]] = k - if result.entityValues == nil { - result.entityValues = make(map[common.SeriesID]map[string]*modelv1.TagValue) - } - } else { - if !found { - found = true - tagProjectionOnPart = append(tagProjectionOnPart, model.TagProjection{ - Family: qo.TagProjection[i].Family, - }) - } - tagProjectionOnPart[len(tagProjectionOnPart)-1].Names = append( - tagProjectionOnPart[len(tagProjectionOnPart)-1].Names, - qo.TagProjection[i].Names[j]) - } - } - } - } - return -} - func mustDecodeTagValue(valueType pbv1.ValueType, value []byte) *modelv1.TagValue { if value == nil { return pbv1.NullTagValue @@ -396,15 +420,15 @@ func binaryDataFieldValue(value []byte) *modelv1.FieldValue { } type queryResult struct { - sidToIndex map[common.SeriesID]int - entityValues map[common.SeriesID]map[string]*modelv1.TagValue - tagProjection []model.TagProjection - data []*blockCursor - snapshots []*snapshot - segments []storage.Segment[*tsTable, option] - loaded bool - orderByTS bool - ascTS bool + sidToIndex map[common.SeriesID]int + storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue + tagProjection []model.TagProjection + data []*blockCursor + snapshots []*snapshot + segments []storage.Segment[*tsTable, option] + loaded bool + orderByTS bool + ascTS bool } func (qr *queryResult) Pull() *model.MeasureResult { @@ -451,12 +475,12 @@ func (qr *queryResult) Pull() *model.MeasureResult { if len(qr.data) == 1 { r := &model.MeasureResult{} bc := qr.data[0] - bc.copyAllTo(r, qr.entityValues, qr.tagProjection, qr.orderByTimestampDesc()) + bc.copyAllTo(r, qr.storedIndexValue, qr.tagProjection, qr.orderByTimestampDesc()) qr.data = qr.data[:0] releaseBlockCursor(bc) return r } - return qr.merge(qr.entityValues, qr.tagProjection) + return qr.merge(qr.storedIndexValue, qr.tagProjection) } func (qr *queryResult) Release() { @@ -531,7 +555,7 @@ func (qr *queryResult) orderByTimestampDesc() bool { return qr.orderByTS && !qr.ascTS } -func (qr *queryResult) merge(entityValuesAll map[common.SeriesID]map[string]*modelv1.TagValue, +func (qr *queryResult) merge(storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue, tagProjection []model.TagProjection, ) *model.MeasureResult { step := 1 @@ -555,7 +579,7 @@ func (qr *queryResult) merge(entityValuesAll map[common.SeriesID]map[string]*mod logger.Panicf("following parts version should be less or equal to the previous one") } } else { - topBC.copyTo(result, entityValuesAll, tagProjection) + topBC.copyTo(result, storedIndexValue, tagProjection) lastVersion = topBC.versions[topBC.idx] } diff --git a/banyand/measure/write.go b/banyand/measure/write.go index 5b2e695d..dadeb612 100644 --- a/banyand/measure/write.go +++ b/banyand/measure/write.go @@ -162,7 +162,8 @@ func (w *writeCallback) handle(dst map[string]*dataPointsInGroup, writeEvent *me IndexRuleID: r.GetMetadata().GetId(), Analyzer: r.Analyzer, }, - Term: encodeTagValue.value, + Term: encodeTagValue.value, + Store: true, }) } else { for _, val := range encodeTagValue.valueArr { @@ -171,10 +172,12 @@ func (w *writeCallback) handle(dst map[string]*dataPointsInGroup, writeEvent *me IndexRuleID: r.GetMetadata().GetId(), Analyzer: r.Analyzer, }, - Term: val, + Term: val, + Store: true, }) } } + continue } _, isEntity := stm.indexRuleLocators.EntitySet[t.Name] if tagFamilySpec.Tags[j].IndexedOnly || isEntity { diff --git a/banyand/stream/index.go b/banyand/stream/index.go index adcfe396..c3f4cdb2 100644 --- a/banyand/stream/index.go +++ b/banyand/stream/index.go @@ -54,7 +54,7 @@ func newElementIndex(ctx context.Context, root string, flushTimeoutSeconds int64 func (e *elementIndex) Sort(sids []common.SeriesID, fieldKey index.FieldKey, order modelv1.Sort, timeRange *timestamp.TimeRange, preloadSize int, -) (index.FieldIterator[*index.ItemRef], error) { +) (index.FieldIterator[*index.DocumentResult], error) { iter, err := e.store.Sort(sids, fieldKey, order, timeRange, preloadSize) if err != nil { return nil, err diff --git a/banyand/stream/query.go b/banyand/stream/query.go index b70ce0b4..9dff205f 100644 --- a/banyand/stream/query.go +++ b/banyand/stream/query.go @@ -140,7 +140,7 @@ type queryOptions struct { } type queryResult struct { - sortingIter itersort.Iterator[*index.ItemRef] + sortingIter itersort.Iterator[*index.DocumentResult] tagNameIndex map[string]partition.TagLocator schema *databasev1.Stream tabs []*tsTable @@ -545,7 +545,7 @@ func indexSearch(sqo model.StreamQueryOptions, func (s *stream) indexSort(sqo model.StreamQueryOptions, tabs []*tsTable, seriesList pbv1.SeriesList, -) (itersort.Iterator[*index.ItemRef], error) { +) (itersort.Iterator[*index.DocumentResult], error) { if sqo.Order == nil || sqo.Order.Index == nil { return nil, nil } @@ -554,19 +554,19 @@ func (s *stream) indexSort(sqo model.StreamQueryOptions, tabs []*tsTable, return nil, err } desc := sqo.Order != nil && sqo.Order.Index == nil && sqo.Order.Sort == modelv1.Sort_SORT_DESC - return itersort.NewItemIter[*index.ItemRef](iters, desc), nil + return itersort.NewItemIter[*index.DocumentResult](iters, desc), nil } func (s *stream) buildItersByIndex(tables []*tsTable, seriesList pbv1.SeriesList, sqo model.StreamQueryOptions, -) (iters []itersort.Iterator[*index.ItemRef], err error) { +) (iters []itersort.Iterator[*index.DocumentResult], err error) { indexRuleForSorting := sqo.Order.Index if len(indexRuleForSorting.Tags) != 1 { return nil, fmt.Errorf("only support one tag for sorting, but got %d", len(indexRuleForSorting.Tags)) } sids := seriesList.IDs() for _, tw := range tables { - var iter index.FieldIterator[*index.ItemRef] + var iter index.FieldIterator[*index.DocumentResult] fieldKey := index.FieldKey{ IndexRuleID: indexRuleForSorting.GetMetadata().GetId(), Analyzer: indexRuleForSorting.GetAnalyzer(), diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go index 13a95f5c..d3e901e4 100644 --- a/banyand/stream/stream.go +++ b/banyand/stream/stream.go @@ -90,7 +90,7 @@ func (s *stream) Close() error { func (s *stream) parseSpec() { s.name, s.group = s.schema.GetMetadata().GetName(), s.schema.GetMetadata().GetGroup() - s.indexRuleLocators = partition.ParseIndexRuleLocators(s.schema.GetEntity(), s.schema.GetTagFamilies(), s.indexRules) + s.indexRuleLocators, _ = partition.ParseIndexRuleLocators(s.schema.GetEntity(), s.schema.GetTagFamilies(), s.indexRules) } type streamSpec struct { diff --git a/pkg/index/index.go b/pkg/index/index.go index 11cef0ab..5d5bf91e 100644 --- a/pkg/index/index.go +++ b/pkg/index/index.go @@ -55,6 +55,7 @@ type Field struct { Term []byte Key FieldKey NoSort bool + Store bool } // RangeOpts contains options to performance a continuous scan. @@ -92,17 +93,18 @@ func (r RangeOpts) Between(value []byte) int { return 0 } -// ItemRef represents a reference to an item. -type ItemRef struct { - Term []byte - SeriesID common.SeriesID - DocID uint64 - Timestamp int64 +// DocumentResult represents a document in a index. +type DocumentResult struct { + Values map[string][]byte + SortedValue []byte + SeriesID common.SeriesID + DocID uint64 + Timestamp int64 } // SortedField returns the value of the sorted field. -func (ir ItemRef) SortedField() []byte { - return ir.Term +func (ir DocumentResult) SortedField() []byte { + return ir.SortedValue } // FieldIterator allows iterating over a field's posting values. @@ -121,8 +123,8 @@ func (i *dummyIterator) Next() bool { return false } -func (i *dummyIterator) Val() *ItemRef { - return &ItemRef{} +func (i *dummyIterator) Val() *DocumentResult { + return &DocumentResult{} } func (i *dummyIterator) Close() error { @@ -152,8 +154,8 @@ type Writer interface { // FieldIterable allows building a FieldIterator. type FieldIterable interface { - Iterator(fieldKey FieldKey, termRange RangeOpts, order modelv1.Sort, preLoadSize int) (iter FieldIterator[*ItemRef], err error) - Sort(sids []common.SeriesID, fieldKey FieldKey, order modelv1.Sort, timeRange *timestamp.TimeRange, preLoadSize int) (FieldIterator[*ItemRef], error) + Iterator(fieldKey FieldKey, termRange RangeOpts, order modelv1.Sort, preLoadSize int) (iter FieldIterator[*DocumentResult], err error) + Sort(sids []common.SeriesID, fieldKey FieldKey, order modelv1.Sort, timeRange *timestamp.TimeRange, preLoadSize int) (FieldIterator[*DocumentResult], error) } // Searcher allows searching a field either by its key or by its key and term. @@ -183,11 +185,17 @@ func (s Series) String() string { return fmt.Sprintf("%s:%d", s.EntityValues, s.ID) } +// SeriesDocument represents a series document in a index. +type SeriesDocument struct { + Fields map[string][]byte + Key Series +} + // SeriesStore is an abstract of a series repository. type SeriesStore interface { Store // Search returns a list of series that match the given matchers. - Search(context.Context, []SeriesMatcher) ([]Series, error) + Search(context.Context, []SeriesMatcher, []FieldKey) ([]SeriesDocument, error) } // SeriesMatcherType represents the type of series matcher. diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go index aa9e4532..4d831fb2 100644 --- a/pkg/index/inverted/inverted.go +++ b/pkg/index/inverted/inverted.go @@ -59,6 +59,7 @@ var ( defaultUpper = convert.Uint64ToBytes(math.MaxUint64) defaultLower = convert.Uint64ToBytes(0) defaultRangePreloadSize = 1000 + defaultProjection = []string{docIDField} ) var analyzers map[databasev1.IndexRule_Analyzer]*analysis.Analyzer @@ -119,6 +120,9 @@ func (s *store) Batch(batch index.Batch) error { if !f.NoSort { tf.StoreValue().Sortable() } + if f.Store { + tf.StoreValue() + } if f.Key.Analyzer != databasev1.IndexRule_ANALYZER_UNSPECIFIED { tf = tf.WithAnalyzer(analyzers[f.Key.Analyzer]) } @@ -169,7 +173,9 @@ func (s *store) Close() error { return s.writer.Close() } -func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, order modelv1.Sort, preLoadSize int) (iter index.FieldIterator[*index.ItemRef], err error) { +func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, + order modelv1.Sort, preLoadSize int, +) (iter index.FieldIterator[*index.DocumentResult], err error) { if termRange.Lower != nil && termRange.Upper != nil && bytes.Compare(termRange.Lower, termRange.Upper) > 0 { @@ -221,7 +227,6 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, ord reader: reader, sortedKey: sortedKey, size: preLoadSize, - sid: fieldKey.SeriesID, closer: s.closer, } return result, nil @@ -247,13 +252,13 @@ func (s *store) MatchTerms(field index.Field) (list posting.List, err error) { if err != nil { return nil, err } - iter := newBlugeMatchIterator(documentMatchIterator, reader, nil) + iter := newBlugeMatchIterator(documentMatchIterator, reader, defaultProjection) defer func() { err = multierr.Append(err, iter.Close()) }() list = roaring.NewPostingList() for iter.Next() { - list.Insert(iter.Val().docID) + list.Insert(iter.Val().DocID) } return list, err } @@ -280,13 +285,13 @@ func (s *store) Match(fieldKey index.FieldKey, matches []string) (posting.List, if err != nil { return nil, err } - iter := newBlugeMatchIterator(documentMatchIterator, reader, nil) + iter := newBlugeMatchIterator(documentMatchIterator, reader, defaultProjection) defer func() { err = multierr.Append(err, iter.Close()) }() list := roaring.NewPostingList() for iter.Next() { - list.Insert(iter.Val().docID) + list.Insert(iter.Val().DocID) } return list, err } @@ -309,28 +314,25 @@ func (s *store) SizeOnDisk() int64 { return int64(bytes) } -type searchResult struct { - values map[string][]byte - seriesID common.SeriesID - docID uint64 - timestamp int64 -} - type blugeMatchIterator struct { - delegated search.DocumentMatchIterator - err error - closer io.Closer - current searchResult + delegated search.DocumentMatchIterator + err error + closer io.Closer + needToLoadFields []string + current index.DocumentResult } -func newBlugeMatchIterator(delegated search.DocumentMatchIterator, closer io.Closer, needToLoadFields []string) blugeMatchIterator { +func newBlugeMatchIterator(delegated search.DocumentMatchIterator, closer io.Closer, + needToLoadFields []string, +) blugeMatchIterator { bmi := blugeMatchIterator{ - delegated: delegated, - closer: closer, - current: searchResult{values: make(map[string][]byte, len(needToLoadFields))}, + delegated: delegated, + closer: closer, + needToLoadFields: needToLoadFields, + current: index.DocumentResult{Values: make(map[string][]byte, len(needToLoadFields))}, } - for i := range needToLoadFields { - bmi.current.values[needToLoadFields[i]] = nil + for _, f := range needToLoadFields { + bmi.current.Values[f] = nil } return bmi } @@ -345,25 +347,32 @@ func (bmi *blugeMatchIterator) Next() bool { bmi.err = io.EOF return false } - for i := range bmi.current.values { - bmi.current.values[i] = nil + for i := range bmi.current.Values { + bmi.current.Values[i] = nil + } + bmi.current.DocID = 0 + bmi.current.SeriesID = 0 + bmi.current.Timestamp = 0 + bmi.current.SortedValue = nil + if len(match.SortValue) > 0 { + bmi.current.SortedValue = match.SortValue[0] } err := match.VisitStoredFields(func(field string, value []byte) bool { switch field { case docIDField: - bmi.current.docID = convert.BytesToUint64(value) + bmi.current.DocID = convert.BytesToUint64(value) case seriesIDField: - bmi.current.seriesID = common.SeriesID(convert.BytesToUint64(value)) + bmi.current.SeriesID = common.SeriesID(convert.BytesToUint64(value)) case timestampField: ts, err := bluge.DecodeDateTime(value) if err != nil { bmi.err = err return false } - bmi.current.timestamp = ts.UnixNano() + bmi.current.Timestamp = ts.UnixNano() default: - if _, ok := bmi.current.values[field]; ok { - bmi.current.values[field] = bytes.Clone(value) + if _, ok := bmi.current.Values[field]; ok { + bmi.current.Values[field] = bytes.Clone(value) } } return true @@ -372,7 +381,7 @@ func (bmi *blugeMatchIterator) Next() bool { return bmi.err == nil } -func (bmi *blugeMatchIterator) Val() searchResult { +func (bmi *blugeMatchIterator) Val() index.DocumentResult { return bmi.current } diff --git a/pkg/index/inverted/inverted_series.go b/pkg/index/inverted/inverted_series.go index dadc0e64..02852397 100644 --- a/pkg/index/inverted/inverted_series.go +++ b/pkg/index/inverted/inverted_series.go @@ -19,6 +19,7 @@ package inverted import ( + "bytes" "context" "github.com/blugelabs/bluge" @@ -30,10 +31,10 @@ import ( "github.com/apache/skywalking-banyandb/pkg/index" ) -var emptySeries = make([]index.Series, 0) +var emptySeries = make([]index.SeriesDocument, 0) // Search implements index.SeriesStore. -func (s *store) Search(ctx context.Context, seriesMatchers []index.SeriesMatcher) ([]index.Series, error) { +func (s *store) Search(ctx context.Context, seriesMatchers []index.SeriesMatcher, projection []index.FieldKey) ([]index.SeriesDocument, error) { if len(seriesMatchers) == 0 { return emptySeries, nil } @@ -77,33 +78,47 @@ func (s *store) Search(ctx context.Context, seriesMatchers []index.SeriesMatcher if err != nil { return nil, err } - return parseResult(dmi) + return parseResult(dmi, projection) } -func parseResult(dmi search.DocumentMatchIterator) ([]index.Series, error) { - result := make([]index.Series, 0, 10) +func parseResult(dmi search.DocumentMatchIterator, loadedFields []index.FieldKey) ([]index.SeriesDocument, error) { + result := make([]index.SeriesDocument, 0, 10) next, err := dmi.Next() docIDMap := make(map[uint64]struct{}) + fields := make([]string, 0, len(loadedFields)) + for i := range loadedFields { + fields = append(fields, loadedFields[i].Marshal()) + } for err == nil && next != nil { - var series index.Series + var doc index.SeriesDocument + if len(loadedFields) > 0 { + doc.Fields = make(map[string][]byte) + for i := range loadedFields { + doc.Fields[fields[i]] = nil + } + } err = next.VisitStoredFields(func(field string, value []byte) bool { - if field == docIDField { + switch field { + case docIDField: id := convert.BytesToUint64(value) if _, ok := docIDMap[id]; !ok { - series.ID = common.SeriesID(convert.BytesToUint64(value)) + doc.Key.ID = common.SeriesID(convert.BytesToUint64(value)) docIDMap[id] = struct{}{} } - } - if field == entityField { - series.EntityValues = value + case entityField: + doc.Key.EntityValues = value + default: + if _, ok := doc.Fields[field]; ok { + doc.Fields[field] = bytes.Clone(value) + } } return true }) if err != nil { return nil, errors.WithMessage(err, "visit stored fields") } - if series.ID > 0 { - result = append(result, series) + if doc.Key.ID > 0 { + result = append(result, doc) } next, err = dmi.Next() } diff --git a/pkg/index/inverted/inverted_series_test.go b/pkg/index/inverted/inverted_series_test.go index b62ef4af..03a5e181 100644 --- a/pkg/index/inverted/inverted_series_test.go +++ b/pkg/index/inverted/inverted_series_test.go @@ -25,13 +25,23 @@ import ( "github.com/stretchr/testify/require" "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/logger" ) +var ( + fieldKeyDuration = index.FieldKey{ + IndexRuleID: indexRuleID, + } + fieldKeyServiceName = index.FieldKey{ + IndexRuleID: 6, + } +) + func TestStore_Search(t *testing.T) { - tester := assert.New(t) - path, fn := setUp(require.New(t)) + tester := require.New(t) + path, fn := setUp(tester) s, err := NewStore(StoreOpts{ Path: path, Logger: logger.GetLogger("test"), @@ -47,32 +57,120 @@ func TestStore_Search(t *testing.T) { // Test cases tests := []struct { - term [][]byte - want []index.Series + term [][]byte + want []index.SeriesDocument + projection []index.FieldKey }{ { term: [][]byte{[]byte("test1")}, - want: []index.Series{ + want: []index.SeriesDocument{ { - ID: common.SeriesID(1), - EntityValues: []byte("test1"), + Key: index.Series{ + ID: common.SeriesID(1), + EntityValues: []byte("test1"), + }, }, }, }, { - term: [][]byte{[]byte("test1"), []byte("test2"), []byte("test3"), []byte("foo")}, - want: []index.Series{ + term: [][]byte{[]byte("test1"), []byte("test2"), []byte("test3"), []byte("foo")}, + projection: []index.FieldKey{fieldKeyDuration, fieldKeyServiceName}, + want: []index.SeriesDocument{ { - ID: common.SeriesID(1), - EntityValues: []byte("test1"), + Key: index.Series{ + ID: common.SeriesID(1), + EntityValues: []byte("test1"), + }, + Fields: map[string][]byte{ + fieldKeyDuration.Marshal(): nil, + fieldKeyServiceName.Marshal(): nil, + }, }, { - ID: common.SeriesID(2), - EntityValues: []byte("test2"), + Key: index.Series{ + ID: common.SeriesID(2), + EntityValues: []byte("test2"), + }, + Fields: map[string][]byte{ + fieldKeyDuration.Marshal(): convert.Int64ToBytes(int64(100)), + fieldKeyServiceName.Marshal(): []byte("svc2"), + }, }, { - ID: common.SeriesID(3), - EntityValues: []byte("test3"), + Key: index.Series{ + ID: common.SeriesID(3), + EntityValues: []byte("test3"), + }, + Fields: map[string][]byte{ + fieldKeyDuration.Marshal(): convert.Int64ToBytes(int64(500)), + fieldKeyServiceName.Marshal(): nil, + }, + }, + }, + }, + { + term: [][]byte{[]byte("test1"), []byte("test2"), []byte("test3"), []byte("foo")}, + projection: []index.FieldKey{fieldKeyDuration}, + want: []index.SeriesDocument{ + { + Key: index.Series{ + ID: common.SeriesID(1), + EntityValues: []byte("test1"), + }, + Fields: map[string][]byte{ + fieldKeyDuration.Marshal(): nil, + }, + }, + { + Key: index.Series{ + ID: common.SeriesID(2), + EntityValues: []byte("test2"), + }, + Fields: map[string][]byte{ + fieldKeyDuration.Marshal(): convert.Int64ToBytes(int64(100)), + }, + }, + { + Key: index.Series{ + ID: common.SeriesID(3), + EntityValues: []byte("test3"), + }, + Fields: map[string][]byte{ + fieldKeyDuration.Marshal(): convert.Int64ToBytes(int64(500)), + }, + }, + }, + }, + { + term: [][]byte{[]byte("test1"), []byte("test2"), []byte("test3"), []byte("foo")}, + projection: []index.FieldKey{fieldKeyServiceName}, + want: []index.SeriesDocument{ + { + Key: index.Series{ + ID: common.SeriesID(1), + EntityValues: []byte("test1"), + }, + Fields: map[string][]byte{ + fieldKeyServiceName.Marshal(): nil, + }, + }, + { + Key: index.Series{ + ID: common.SeriesID(2), + EntityValues: []byte("test2"), + }, + Fields: map[string][]byte{ + fieldKeyServiceName.Marshal(): []byte("svc2"), + }, + }, + { + Key: index.Series{ + ID: common.SeriesID(3), + EntityValues: []byte("test3"), + }, + Fields: map[string][]byte{ + fieldKeyServiceName.Marshal(): nil, + }, }, }, }, @@ -92,17 +190,17 @@ func TestStore_Search(t *testing.T) { }) name += string(term) + "-" } - t.Run(name, func(_ *testing.T) { - got, err := s.Search(context.Background(), matchers) - tester.NoError(err) - tester.Equal(tt.want, got) + t.Run(name, func(t *testing.T) { + got, err := s.Search(context.Background(), matchers, tt.projection) + require.NoError(t, err) + assert.Equal(t, tt.want, got) }) } } func TestStore_SearchWildcard(t *testing.T) { - tester := assert.New(t) - path, fn := setUp(require.New(t)) + tester := require.New(t) + path, fn := setUp(tester) s, err := NewStore(StoreOpts{ Path: path, Logger: logger.GetLogger("test"), @@ -118,67 +216,78 @@ func TestStore_SearchWildcard(t *testing.T) { // Test cases tests := []struct { - wildcard []byte - want []index.Series + wildcard []byte + projection []index.FieldKey + want []index.SeriesDocument }{ { wildcard: []byte("test*"), - want: []index.Series{ + want: []index.SeriesDocument{ { - ID: common.SeriesID(1), - EntityValues: []byte("test1"), + Key: index.Series{ + ID: common.SeriesID(1), + EntityValues: []byte("test1"), + }, }, { - ID: common.SeriesID(2), - EntityValues: []byte("test2"), + Key: index.Series{ + ID: common.SeriesID(2), + EntityValues: []byte("test2"), + }, }, { - ID: common.SeriesID(3), - EntityValues: []byte("test3"), + Key: index.Series{ + ID: common.SeriesID(3), + EntityValues: []byte("test3"), + }, }, }, }, { wildcard: []byte("*2"), - want: []index.Series{ + want: []index.SeriesDocument{ { - ID: common.SeriesID(2), - EntityValues: []byte("test2"), + Key: index.Series{ + ID: common.SeriesID(2), + EntityValues: []byte("test2"), + }, }, }, }, { wildcard: []byte("t*st1"), - want: []index.Series{ + want: []index.SeriesDocument{ { - ID: common.SeriesID(1), - EntityValues: []byte("test1"), + Key: index.Series{ + ID: common.SeriesID(1), + EntityValues: []byte("test1"), + }, }, }, }, { wildcard: []byte("foo*"), - want: []index.Series{}, + want: []index.SeriesDocument{}, }, } for _, tt := range tests { - t.Run(string(tt.wildcard), func(_ *testing.T) { + t.Run(string(tt.wildcard), func(t *testing.T) { got, err := s.Search(context.Background(), []index.SeriesMatcher{ { Type: index.SeriesMatcherTypeWildcard, Match: tt.wildcard, }, - }) - tester.NoError(err) - tester.ElementsMatch(tt.want, got) + }, tt.projection) + require.NoError(t, err) + assert.ElementsMatch(t, tt.want, got) }) } } func TestStore_SearchPrefix(t *testing.T) { - tester := assert.New(t) - path, fn := setUp(require.New(t)) + tester := require.New(t) + path, fn := setUp(tester) s, err := NewStore(StoreOpts{ Path: path, Logger: logger.GetLogger("test"), @@ -194,47 +303,54 @@ func TestStore_SearchPrefix(t *testing.T) { // Test cases tests := []struct { - prefix []byte - want []index.Series + prefix []byte + projection []index.FieldKey + want []index.SeriesDocument }{ { prefix: []byte("test"), - want: []index.Series{ + want: []index.SeriesDocument{ { - ID: common.SeriesID(1), - EntityValues: []byte("test1"), + Key: index.Series{ + ID: common.SeriesID(1), + EntityValues: []byte("test1"), + }, }, { - ID: common.SeriesID(2), - EntityValues: []byte("test2"), + Key: index.Series{ + ID: common.SeriesID(2), + EntityValues: []byte("test2"), + }, }, { - ID: common.SeriesID(3), - EntityValues: []byte("test3"), + Key: index.Series{ + ID: common.SeriesID(3), + EntityValues: []byte("test3"), + }, }, }, }, { prefix: []byte("foo"), - want: []index.Series{}, + want: []index.SeriesDocument{}, }, } for _, tt := range tests { - t.Run(string(tt.prefix), func(_ *testing.T) { + t.Run(string(tt.prefix), func(t *testing.T) { got, err := s.Search(context.Background(), []index.SeriesMatcher{ { Type: index.SeriesMatcherTypePrefix, Match: tt.prefix, }, - }) - tester.NoError(err) - tester.ElementsMatch(tt.want, got) + }, tt.projection) + require.NoError(t, err) + assert.ElementsMatch(t, tt.want, got) }) } } -func setupData(tester *assert.Assertions, s index.SeriesStore) { +func setupData(tester *require.Assertions, s index.SeriesStore) { series1 := index.Document{ DocID: 1, EntityValues: []byte("test1"), @@ -243,11 +359,30 @@ func setupData(tester *assert.Assertions, s index.SeriesStore) { series2 := index.Document{ DocID: 2, EntityValues: []byte("test2"), + Fields: []index.Field{ + { + Key: fieldKeyDuration, + Term: convert.Int64ToBytes(int64(100)), + Store: true, + }, + { + Key: fieldKeyServiceName, + Term: []byte("svc2"), + Store: true, + }, + }, } series3 := index.Document{ DocID: 3, EntityValues: []byte("test3"), + Fields: []index.Field{ + { + Key: fieldKeyDuration, + Term: convert.Int64ToBytes(int64(500)), + Store: true, + }, + }, } tester.NoError(s.Batch(index.Batch{ Documents: []index.Document{series1, series2, series3, series3}, diff --git a/pkg/index/inverted/inverted_test.go b/pkg/index/inverted/inverted_test.go index de43daed..07b7ffa7 100644 --- a/pkg/index/inverted/inverted_test.go +++ b/pkg/index/inverted/inverted_test.go @@ -36,8 +36,8 @@ import ( ) func TestStore_Match(t *testing.T) { - tester := assert.New(t) - path, fn := setUp(require.New(t)) + tester := require.New(t) + path, fn := setUp(tester) s, err := NewStore(StoreOpts{ Path: path, Logger: logger.GetLogger("test"), @@ -119,7 +119,8 @@ func TestStore_Match(t *testing.T) { } for _, tt := range tests { name := strings.Join(tt.matches, "-") - t.Run(name, func(_ *testing.T) { + t.Run(name, func(t *testing.T) { + tester := assert.New(t) list, err := s.Match(serviceName, tt.matches) if tt.wantErr { tester.Error(err) @@ -184,7 +185,7 @@ func TestStore_SeriesMatch(t *testing.T) { } } -func setup(tester *assert.Assertions, s index.Store, serviceName index.FieldKey) { +func setup(tester *require.Assertions, s index.Store, serviceName index.FieldKey) { var batch index.Batch batch.Documents = append(batch.Documents, index.Document{ diff --git a/pkg/index/inverted/sort.go b/pkg/index/inverted/sort.go index 0b8e5a85..c63b4c42 100644 --- a/pkg/index/inverted/sort.go +++ b/pkg/index/inverted/sort.go @@ -35,7 +35,7 @@ import ( func (s *store) Sort(sids []common.SeriesID, fieldKey index.FieldKey, order modelv1.Sort, timeRange *timestamp.TimeRange, preLoadSize int, -) (iter index.FieldIterator[*index.ItemRef], err error) { +) (iter index.FieldIterator[*index.DocumentResult], err error) { reader, err := s.writer.Reader() if err != nil { return nil, err @@ -69,27 +69,23 @@ func (s *store) Sort(sids []common.SeriesID, fieldKey index.FieldKey, order mode sortedKey = "-" + sortedKey } result := &sortIterator{ - query: query, - reader: reader, - sortedKey: sortedKey, - sortedField: fk, - size: preLoadSize, - sid: fieldKey.SeriesID, + query: query, + reader: reader, + sortedKey: sortedKey, + size: preLoadSize, } return result, nil } type sortIterator struct { - query bluge.Query - err error - reader *bluge.Reader - current *blugeMatchIterator - closer *run.Closer - sortedKey string - sortedField string - size int - skipped int - sid common.SeriesID + query bluge.Query + err error + reader *bluge.Reader + current *blugeMatchIterator + closer *run.Closer + sortedKey string + size int + skipped int } func (si *sortIterator) Next() bool { @@ -124,7 +120,7 @@ func (si *sortIterator) loadCurrent() bool { return false } - iter := newBlugeMatchIterator(documentMatchIterator, nil, []string{si.sortedField}) + iter := newBlugeMatchIterator(documentMatchIterator, nil, nil) si.current = &iter if si.next() { return true @@ -141,18 +137,12 @@ func (si *sortIterator) next() bool { return false } -func (si *sortIterator) Val() *index.ItemRef { +func (si *sortIterator) Val() *index.DocumentResult { v := si.current.Val() - sv, ok := v.values[si.sortedField] - if !ok { + if v.SortedValue == nil { panic("sorted field not found in document") } - return &index.ItemRef{ - SeriesID: v.seriesID, - DocID: v.docID, - Term: sv, - Timestamp: v.timestamp, - } + return &v } func (si *sortIterator) Close() error { diff --git a/pkg/index/inverted/sort_test.go b/pkg/index/inverted/sort_test.go index e7794038..a03367b4 100644 --- a/pkg/index/inverted/sort_test.go +++ b/pkg/index/inverted/sort_test.go @@ -172,8 +172,8 @@ func TestStore_Sort(t *testing.T) { for iter.Next() { val := iter.Val() got.items = append(got.items, val.DocID) - if val.Term != nil { - got.terms = append(got.terms, val.Term) + if val.SortedValue != nil { + got.terms = append(got.terms, val.SortedValue) } } for i := 0; i < 10; i++ { diff --git a/pkg/partition/index.go b/pkg/partition/index.go index 1df11225..9ccb0869 100644 --- a/pkg/partition/index.go +++ b/pkg/partition/index.go @@ -19,6 +19,8 @@ package partition import ( databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + "github.com/apache/skywalking-banyandb/pkg/index" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" ) // IndexRuleLocator is a helper struct to locate the index rule by tag name. @@ -27,9 +29,21 @@ type IndexRuleLocator struct { TagFamilyTRule []map[string]*databasev1.IndexRule } +// FieldWithType is a helper struct to store the field type. +type FieldWithType struct { + Key index.FieldKey + Type pbv1.ValueType +} + +// FieldIndexLocation is a helper struct to store the field index location. +type FieldIndexLocation map[string]map[string]FieldWithType + // ParseIndexRuleLocators returns a IndexRuleLocator based on the tag family spec and index rules. -func ParseIndexRuleLocators(entity *databasev1.Entity, families []*databasev1.TagFamilySpec, indexRules []*databasev1.IndexRule) (locators IndexRuleLocator) { +func ParseIndexRuleLocators(entity *databasev1.Entity, families []*databasev1.TagFamilySpec, + indexRules []*databasev1.IndexRule, +) (locators IndexRuleLocator, fil FieldIndexLocation) { locators.EntitySet = make(map[string]struct{}, len(entity.TagNames)) + fil = make(FieldIndexLocation) for i := range entity.TagNames { locators.EntitySet[entity.TagNames[i]] = struct{}{} } @@ -50,8 +64,19 @@ func ParseIndexRuleLocators(entity *databasev1.Entity, families []*databasev1.Ta ir := findIndexRuleByTagName(families[i].Tags[j].Name) if ir != nil { ttr[families[i].Tags[j].Name] = ir + tagFamily, ok := fil[families[i].Name] + if !ok { + tagFamily = make(map[string]FieldWithType) + fil[families[i].Name] = tagFamily + } + tagFamily[families[i].Tags[j].Name] = FieldWithType{ + Key: index.FieldKey{ + IndexRuleID: ir.Metadata.Id, + }, + Type: pbv1.MustTagValueSpecToValueType(families[i].Tags[j].Type), + } } } } - return locators + return locators, fil } diff --git a/pkg/pb/v1/value.go b/pkg/pb/v1/value.go index 064d2022..ec6e757e 100644 --- a/pkg/pb/v1/value.go +++ b/pkg/pb/v1/value.go @@ -23,6 +23,7 @@ import ( "github.com/pkg/errors" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/encoding" @@ -63,6 +64,24 @@ func MustTagValueToValueType(tag *modelv1.TagValue) ValueType { } } +// MustTagValueSpecToValueType converts databasev1.TagType to ValueType. +func MustTagValueSpecToValueType(tag databasev1.TagType) ValueType { + switch tag { + case databasev1.TagType_TAG_TYPE_STRING: + return ValueTypeStr + case databasev1.TagType_TAG_TYPE_INT: + return ValueTypeInt64 + case databasev1.TagType_TAG_TYPE_DATA_BINARY: + return ValueTypeBinaryData + case databasev1.TagType_TAG_TYPE_STRING_ARRAY: + return ValueTypeStrArr + case databasev1.TagType_TAG_TYPE_INT_ARRAY: + return ValueTypeInt64Arr + default: + panic("unknown tag value type") + } +} + func marshalTagValue(dest []byte, tv *modelv1.TagValue) ([]byte, error) { dest = append(dest, byte(MustTagValueToValueType(tv))) switch tv.Value.(type) { diff --git a/test/cases/measure/data/input/entity.yaml b/test/cases/measure/data/input/entity.yaml index 40a1c756..8e37f7b8 100644 --- a/test/cases/measure/data/input/entity.yaml +++ b/test/cases/measure/data/input/entity.yaml @@ -20,7 +20,7 @@ groups: ["sw_metric"] tagProjection: tagFamilies: - name: "default" - tags: ["service_id"] + tags: ["service_id", "layer", "name", "short_name"] criteria: le: op: "LOGICAL_OP_AND" diff --git a/test/cases/measure/data/input/entity_in.yaml b/test/cases/measure/data/input/entity_in.yaml index eb19686a..b872a268 100644 --- a/test/cases/measure/data/input/entity_in.yaml +++ b/test/cases/measure/data/input/entity_in.yaml @@ -20,7 +20,7 @@ groups: ["sw_metric"] tagProjection: tagFamilies: - name: "default" - tags: ["id", "service_id"] + tags: ["name", "short_name"] criteria: le: op: "LOGICAL_OP_AND" diff --git a/test/cases/measure/data/input/entity_service.yaml b/test/cases/measure/data/input/entity_service.yaml index 1074be66..b85da375 100644 --- a/test/cases/measure/data/input/entity_service.yaml +++ b/test/cases/measure/data/input/entity_service.yaml @@ -20,7 +20,7 @@ groups: ["sw_metric"] tagProjection: tagFamilies: - name: "default" - tags: ["id", "service_id"] + tags: ["id", "service_id", "layer"] criteria: le: op: "LOGICAL_OP_AND" diff --git a/test/cases/measure/data/input/no_field.yaml b/test/cases/measure/data/input/no_field.yaml index 14454a22..8743c88c 100644 --- a/test/cases/measure/data/input/no_field.yaml +++ b/test/cases/measure/data/input/no_field.yaml @@ -20,7 +20,7 @@ groups: ["sw_metric"] tagProjection: tagFamilies: - name: "default" - tags: ["id", "service_id"] + tags: ["id"] criteria: condition: name: "service_id" diff --git a/test/cases/measure/data/want/entity.yaml b/test/cases/measure/data/want/entity.yaml index 8586f55f..4fc05dc5 100644 --- a/test/cases/measure/data/want/entity.yaml +++ b/test/cases/measure/data/want/entity.yaml @@ -16,10 +16,22 @@ # under the License. dataPoints: - - tagFamilies: - - name: default - tags: - - key: service_id - value: - str: - value: service_1 \ No newline at end of file +- tagFamilies: + - name: default + tags: + - key: service_id + value: + str: + value: service_1 + - key: layer + value: + int: + value: "1" + - key: name + value: + str: + value: service_name_1 + - key: short_name + value: + str: + value: service_short_name_1 diff --git a/test/cases/measure/data/want/entity_in.yaml b/test/cases/measure/data/want/entity_in.yaml index 6c269eb0..73ba584e 100644 --- a/test/cases/measure/data/want/entity_in.yaml +++ b/test/cases/measure/data/want/entity_in.yaml @@ -19,23 +19,22 @@ dataPoints: - tagFamilies: - name: default tags: - - key: id + - key: name value: str: - value: "1" - - key: service_id + value: service_name_1 + - key: short_name value: str: - value: service_1 + value: service_short_name_1 - tagFamilies: - name: default tags: - - key: id + - key: name value: str: - value: "2" - - key: service_id + value: service_name_2 + - key: short_name value: str: - value: service_2 - \ No newline at end of file + value: service_short_name_2 diff --git a/test/cases/measure/data/want/entity_service.yaml b/test/cases/measure/data/want/entity_service.yaml index 58825ddc..d1d62da2 100644 --- a/test/cases/measure/data/want/entity_service.yaml +++ b/test/cases/measure/data/want/entity_service.yaml @@ -16,14 +16,18 @@ # under the License. dataPoints: - - tagFamilies: - - name: default - tags: - - key: id - value: - str: - value: "1" - - key: service_id - value: - str: - value: service_1 \ No newline at end of file +- tagFamilies: + - name: default + tags: + - key: id + value: + str: + value: "1" + - key: service_id + value: + str: + value: service_1 + - key: layer + value: + int: + value: "1" diff --git a/test/cases/measure/data/want/no_field.yaml b/test/cases/measure/data/want/no_field.yaml index 58825ddc..b16d1cea 100644 --- a/test/cases/measure/data/want/no_field.yaml +++ b/test/cases/measure/data/want/no_field.yaml @@ -16,14 +16,10 @@ # under the License. dataPoints: - - tagFamilies: - - name: default - tags: - - key: id - value: - str: - value: "1" - - key: service_id - value: - str: - value: service_1 \ No newline at end of file +- tagFamilies: + - name: default + tags: + - key: id + value: + str: + value: "1"
