This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch index in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit b0e2ababad384c08ae8a15a9ea40c77d8607783c Author: Gao Hongtao <[email protected]> AuthorDate: Fri Nov 15 09:54:17 2024 +0800 Introduce "index_mode" to save data exclusively in the series index Signed-off-by: Gao Hongtao <[email protected]> --- CHANGES.md | 1 + banyand/internal/storage/index.go | 64 +- banyand/internal/storage/index_test.go | 4 +- banyand/internal/storage/segment.go | 3 + banyand/internal/storage/storage.go | 6 +- banyand/measure/query.go | 225 ++++--- banyand/stream/benchmark_test.go | 6 +- pkg/index/index.go | 28 +- pkg/index/inverted/inverted.go | 50 +- pkg/index/inverted/inverted_series.go | 121 ++-- pkg/index/inverted/inverted_series_test.go | 647 +++++++++++++++++++-- pkg/index/inverted/query.go | 21 + pkg/pb/v1/series.go | 1 + .../measure/measure_plan_indexscan_local.go | 46 +- pkg/query/logical/measure/topn_plan_localscan.go | 3 +- pkg/query/logical/optimizer.go | 3 +- pkg/query/logical/plan.go | 3 + .../logical/stream/stream_plan_indexscan_local.go | 4 +- pkg/query/model/model.go | 24 +- .../measure/testdata/measures/service_traffic.json | 1 + pkg/timestamp/range.go | 16 + test/cases/measure/data/input/index_mode_all.yaml | 23 + .../measure/data/input/index_mode_order_desc.yaml | 26 + .../cases/measure/data/input/index_mode_range.yaml | 30 + test/cases/measure/data/want/index_mode_all.yaml | 107 ++++ .../measure/data/want/index_mode_order_desc.yaml | 83 +++ test/cases/measure/data/want/index_mode_range.yaml | 39 ++ test/cases/measure/measure.go | 3 + 28 files changed, 1288 insertions(+), 300 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 38415e5f..85b36cba 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -9,6 +9,7 @@ Release Notes. - Add the `bydbctl analyze series` command to analyze the series data. - Index: Remove sortable field from the stored field. If a field is sortable only, it won't be stored. - Index: Support InsertIfAbsent functionality which ensures documents are only inserted if their docIDs are not already present in the current index. There is a exception for the documents with extra index fields more than the entity's index fields. +- Measure: Introduce "index_mode" to save data exclusively in the series index, ideal for non-timeseries measures. ### Bug Fixes diff --git a/banyand/internal/storage/index.go b/banyand/internal/storage/index.go index ba3bd443..e25ad7ae 100644 --- a/banyand/internal/storage/index.go +++ b/banyand/internal/storage/index.go @@ -31,6 +31,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/query" + "github.com/apache/skywalking-banyandb/pkg/timestamp" ) func (s *segment[T, O]) IndexDB() IndexDB { @@ -38,7 +39,7 @@ func (s *segment[T, O]) IndexDB() IndexDB { } func (s *segment[T, O]) Lookup(ctx context.Context, series []*pbv1.Series) (pbv1.SeriesList, error) { - sl, _, _, err := s.index.filter(ctx, series, nil, nil) + sl, _, _, err := s.index.filter(ctx, series, nil, nil, nil) return sl, err } @@ -76,10 +77,8 @@ func (s *seriesIndex) Write(docs index.Documents) error { }) } -var rangeOpts = index.RangeOpts{} - func (s *seriesIndex) filter(ctx context.Context, series []*pbv1.Series, - projection []index.FieldKey, secondaryQuery index.Query, + projection []index.FieldKey, secondaryQuery index.Query, timeRange *timestamp.TimeRange, ) (sl pbv1.SeriesList, fields FieldResultList, tss []int64, err error) { seriesMatchers := make([]index.SeriesMatcher, len(series)) for i := range series { @@ -88,7 +87,7 @@ func (s *seriesIndex) filter(ctx context.Context, series []*pbv1.Series, return nil, nil, nil, err } } - indexQuery, err := s.store.BuildQuery(seriesMatchers, secondaryQuery) + indexQuery, err := s.store.BuildQuery(seriesMatchers, secondaryQuery, timeRange) if err != nil { return nil, nil, nil, err } @@ -182,7 +181,6 @@ func convertIndexSeriesToSeriesList(indexSeries []index.SeriesDocument, hasField var timestamps []int64 for _, s := range indexSeries { var series pbv1.Series - series.ID = s.Key.ID if err := series.Unmarshal(s.Key.EntityValues); err != nil { return nil, nil, nil, errors.WithMessagef(err, "failed to unmarshal series: %s", s.Key.EntityValues) } @@ -197,11 +195,15 @@ func convertIndexSeriesToSeriesList(indexSeries []index.SeriesDocument, hasField return seriesList, fields, timestamps, nil } -func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, opts IndexSearchOpts) (sl pbv1.SeriesList, frl FieldResultList, tss []int64, err error) { +func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, opts IndexSearchOpts, +) (sl pbv1.SeriesList, frl FieldResultList, tss []int64, sortedValues [][]byte, err error) { tracer := query.GetTracer(ctx) if tracer != nil { var span *query.Span span, ctx = tracer.StartSpan(ctx, "seriesIndex.Search") + if opts.Query != nil { + span.Tagf("secondary_query", "%s", opts.Query.String()) + } defer func() { if err != nil { span.Error(err) @@ -211,19 +213,11 @@ func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, opts In } if opts.Order == nil || opts.Order.Index == nil { - if opts.Query != nil { - sl, frl, tss, err = s.filter(ctx, series, opts.Projection, opts.Query) - } else { - sl, frl, tss, err = s.filter(ctx, series, opts.Projection, nil) - } + sl, frl, tss, err = s.filter(ctx, series, opts.Projection, opts.Query, opts.TimeRange) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } - return sl, frl, tss, nil - } - - fieldKey := index.FieldKey{ - IndexRuleID: opts.Order.Index.GetMetadata().Id, + return sl, frl, tss, nil, nil } var span *query.Span if tracer != nil { @@ -240,43 +234,43 @@ func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, opts In for i := range series { seriesMatchers[i], err = convertEntityValuesToSeriesMatcher(series[i]) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } } - query, err := s.store.BuildQuery(seriesMatchers, opts.Query) + query, err := s.store.BuildQuery(seriesMatchers, opts.Query, opts.TimeRange) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } - iter, err := s.store.SeriesSort(ctx, fieldKey, rangeOpts, - opts.Order.Sort, opts.PreloadSize, query, opts.Projection) + iter, err := s.store.SeriesSort(ctx, query, opts.Order, + opts.PreloadSize, opts.Projection) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } defer func() { err = multierr.Append(err, iter.Close()) }() var r int - result := make([]index.SeriesDocument, 0, 10) for iter.Next() { r++ val := iter.Val() - var doc index.SeriesDocument - doc.Fields = maps.Clone(val.Values) - doc.Key.ID = common.SeriesID(val.DocID) - doc.Key.EntityValues = val.EntityValues - result = append(result, doc) - } - sl, frl, tss, err = convertIndexSeriesToSeriesList(result, len(opts.Projection) > 0) - if err != nil { - return nil, nil, nil, errors.WithMessagef(err, "failed to convert index series to series list, matchers: %v, matched: %d", seriesMatchers, len(result)) + var series pbv1.Series + if err = series.Unmarshal(val.EntityValues); err != nil { + return nil, nil, nil, nil, errors.WithMessagef(err, "failed to unmarshal series: %s", val.EntityValues) + } + sl = append(sl, &series) + tss = append(tss, val.Timestamp) + if len(opts.Projection) > 0 { + frl = append(frl, maps.Clone(val.Values)) + } + sortedValues = append(sortedValues, val.SortedValue) } if span != nil { span.Tagf("query", "%s", iter.Query().String()) span.Tagf("rounds", "%d", r) span.Tagf("size", "%d", len(sl)) } - return sl, frl, tss, err + return sl, frl, tss, sortedValues, err } func (s *seriesIndex) Close() error { diff --git a/banyand/internal/storage/index_test.go b/banyand/internal/storage/index_test.go index 6e4db479..3fd39aed 100644 --- a/banyand/internal/storage/index_test.go +++ b/banyand/internal/storage/index_test.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/apache/skywalking-banyandb/api/common" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/logger" @@ -157,11 +158,12 @@ func TestSeriesIndex_Primary(t *testing.T) { seriesQuery.EntityValues = tt.entityValues[i] seriesQueries = append(seriesQueries, &seriesQuery) } - sl, _, _, err := si.filter(ctx, seriesQueries, nil, nil) + sl, _, _, err := si.filter(ctx, seriesQueries, nil, nil, nil) require.NoError(t, err) require.Equal(t, len(tt.entityValues), len(sl)) assert.Equal(t, tt.subject, sl[0].Subject) for i := range tt.expected { + assert.Greater(t, sl[i].ID, common.SeriesID(0)) assert.Equal(t, tt.expected[i][0].GetStr().GetValue(), sl[i].EntityValues[0].GetStr().GetValue()) assert.Equal(t, tt.expected[i][1].GetStr().GetValue(), sl[i].EntityValues[1].GetStr().GetValue()) assert.True(t, sl[0].ID > 0) diff --git a/banyand/internal/storage/segment.go b/banyand/internal/storage/segment.go index 0f7d816b..02f59581 100644 --- a/banyand/internal/storage/segment.go +++ b/banyand/internal/storage/segment.go @@ -234,6 +234,9 @@ func (sc *segmentController[T, O]) selectSegments(timeRange timestamp.TimeRange) last := len(sc.lst) - 1 for i := range sc.lst { s := sc.lst[last-i] + if s.GetTimeRange().End.Before(timeRange.Start) { + break + } if s.Overlapping(timeRange) { s.incRef() tt = append(tt, s) diff --git a/banyand/internal/storage/storage.go b/banyand/internal/storage/storage.go index 42502515..aed7a40c 100644 --- a/banyand/internal/storage/storage.go +++ b/banyand/internal/storage/storage.go @@ -36,7 +36,6 @@ import ( "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" - "github.com/apache/skywalking-banyandb/pkg/query/model" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) @@ -67,7 +66,8 @@ type SupplyTSDB[T TSTable] func() T // IndexSearchOpts is the options for searching index. type IndexSearchOpts struct { Query index.Query - Order *model.OrderBy + Order *index.OrderBy + TimeRange *timestamp.TimeRange Projection []index.FieldKey PreloadSize int } @@ -81,7 +81,7 @@ type FieldResultList []FieldResult // IndexDB is the interface of index database. type IndexDB interface { Write(docs index.Documents) error - Search(ctx context.Context, series []*pbv1.Series, opts IndexSearchOpts) (pbv1.SeriesList, FieldResultList, []int64, error) + Search(ctx context.Context, series []*pbv1.Series, opts IndexSearchOpts) (pbv1.SeriesList, FieldResultList, []int64, [][]byte, error) } // TSDB allows listing and getting shard details. diff --git a/banyand/measure/query.go b/banyand/measure/query.go index 32399332..e0d07494 100644 --- a/banyand/measure/query.go +++ b/banyand/measure/query.go @@ -18,6 +18,7 @@ package measure import ( + "bytes" "container/heap" "context" "fmt" @@ -145,23 +146,27 @@ func (s *measure) Query(ctx context.Context, mqo model.MeasureQueryOptions) (mqr if mqo.Order == nil { result.ascTS = true - } else if mqo.Order.Sort == modelv1.Sort_SORT_ASC || mqo.Order.Sort == modelv1.Sort_SORT_UNSPECIFIED { - result.ascTS = true - } - switch mqo.OrderByType { - case model.OrderByTypeTime: result.orderByTS = true - case model.OrderByTypeIndex: - result.orderByTS = false - case model.OrderByTypeSeries: - result.orderByTS = false + } else { + if mqo.Order.Sort == modelv1.Sort_SORT_ASC || mqo.Order.Sort == modelv1.Sort_SORT_UNSPECIFIED { + result.ascTS = true + } + switch mqo.Order.Type { + case index.OrderByTypeTime: + result.orderByTS = true + case index.OrderByTypeIndex: + result.orderByTS = false + case index.OrderByTypeSeries: + result.orderByTS = false + } } + return &result, nil } type tagNameWithType struct { - name string - typ pbv1.ValueType + fieldName string + typ pbv1.ValueType } func (s *measure) searchSeriesList(ctx context.Context, series []*pbv1.Series, mqo model.MeasureQueryOptions, @@ -190,8 +195,8 @@ func (s *measure) searchSeriesList(ctx context.Context, series []*pbv1.Series, m if field, ok := fields[n]; ok { indexProjection = append(indexProjection, field.Key) fieldToValueType[field.Key.Marshal()] = tagNameWithType{ - name: n, - typ: field.Type, + fieldName: n, + typ: field.Type, } continue TAG } @@ -205,7 +210,7 @@ func (s *measure) searchSeriesList(ctx context.Context, series []*pbv1.Series, m } seriesFilter := roaring.NewPostingList() for i := range segments { - sll, fieldResultList, _, err := segments[i].IndexDB().Search(ctx, series, storage.IndexSearchOpts{ + sll, fieldResultList, _, _, err := segments[i].IndexDB().Search(ctx, series, storage.IndexSearchOpts{ Query: mqo.Query, Order: mqo.Order, PreloadSize: preloadSize, @@ -239,7 +244,7 @@ func (s *measure) searchSeriesList(ctx context.Context, series []*pbv1.Series, m } for f, v := range fieldResultList[j] { if tnt, ok := fieldToValueType[f]; ok { - tagValues[tnt.name] = mustDecodeTagValue(tnt.typ, v) + tagValues[tnt.fieldName] = mustDecodeTagValue(tnt.typ, v) } else { logger.Panicf("unknown field %s not found in fieldToValueType", f) } @@ -251,13 +256,14 @@ func (s *measure) searchSeriesList(ctx context.Context, series []*pbv1.Series, m func (s *measure) buildIndexQueryResult(ctx context.Context, series []*pbv1.Series, mqo model.MeasureQueryOptions, segments []storage.Segment[*tsTable, option], -) (*indexQueryResult, error) { - r := &indexQueryResult{ - ctx: ctx, - series: series, - mqo: mqo, - segments: segments, - } +) (*indexSortResult, error) { + defer func() { + for i := range segments { + segments[i].DecRef() + } + }() + r := &indexSortResult{} + var indexProjection []index.FieldKey for _, tp := range mqo.TagProjection { tagFamilyLocation := tagFamilyLocation{ name: tp.Family, @@ -266,6 +272,7 @@ func (s *measure) buildIndexQueryResult(ctx context.Context, series []*pbv1.Seri } TAG: for _, n := range tp.Names { + tagFamilyLocation.tagNames = append(tagFamilyLocation.tagNames, n) for i := range s.schema.GetEntity().GetTagNames() { if n == s.schema.GetEntity().GetTagNames()[i] { tagFamilyLocation.projectedEntityOffsets[n] = i @@ -274,10 +281,10 @@ func (s *measure) buildIndexQueryResult(ctx context.Context, series []*pbv1.Seri } if fields, ok := s.fieldIndexLocation[tp.Family]; ok { if field, ok := fields[n]; ok { - r.indexProjection = append(r.indexProjection, field.Key) - tagFamilyLocation.fieldToValueType[field.Key.Marshal()] = tagNameWithType{ - name: n, - typ: field.Type, + indexProjection = append(indexProjection, field.Key) + tagFamilyLocation.fieldToValueType[n] = tagNameWithType{ + fieldName: field.Key.Marshal(), + typ: field.Type, } continue TAG } @@ -286,6 +293,28 @@ func (s *measure) buildIndexQueryResult(ctx context.Context, series []*pbv1.Seri } r.tfl = append(r.tfl, tagFamilyLocation) } + var err error + opts := storage.IndexSearchOpts{ + Query: mqo.Query, + Order: mqo.Order, + PreloadSize: preloadSize, + Projection: indexProjection, + } + + for i := range segments { + if mqo.TimeRange.Include(segments[i].GetTimeRange()) { + opts.TimeRange = nil + } else { + opts.TimeRange = mqo.TimeRange + } + sr := &segResult{} + sr.sll, sr.frl, sr.timestamps, sr.sortedValues, err = segments[i].IndexDB().Search(ctx, series, opts) + if err != nil { + return nil, err + } + r.segResults = append(r.segResults, sr) + } + heap.Init(&r.segResults) return r, nil } @@ -685,88 +714,112 @@ func (qr *queryResult) merge(storedIndexValue map[common.SeriesID]map[string]*mo return result } -var bypassVersions = []int64{0} +var bypassVersions = []int64{1} -type indexQueryResult struct { - ctx context.Context - err error - tfl []tagFamilyLocation - indexProjection []index.FieldKey - series []*pbv1.Series - segments []storage.Segment[*tsTable, option] - sll pbv1.SeriesList - frl storage.FieldResultList - timestamps []int64 - mqo model.MeasureQueryOptions - i int +type indexSortResult struct { + tfl []tagFamilyLocation + segResults segResultHeap } // Pull implements model.MeasureQueryResult. -func (i *indexQueryResult) Pull() *model.MeasureResult { - if i.i < 0 { - if len(i.segments) < 1 { +func (iqr *indexSortResult) Pull() *model.MeasureResult { + if len(iqr.segResults) < 1 { + return nil + } + if len(iqr.segResults) == 1 { + if iqr.segResults[0].i >= len(iqr.segResults[0].sll) { return nil } - i.sll, i.frl, i.timestamps, i.err = i.segments[0].IndexDB().Search(i.ctx, i.series, storage.IndexSearchOpts{ - Query: i.mqo.Query, - Order: i.mqo.Order, - PreloadSize: preloadSize, - Projection: i.indexProjection, - }) - if i.err != nil { - return &model.MeasureResult{ - Error: i.err, - } - } - i.segments = i.segments[1:] - if len(i.sll) < 1 { - return i.Pull() + sr := iqr.segResults[0] + r := iqr.copyTo(sr) + sr.i++ + if sr.i >= len(sr.sll) { + iqr.segResults = iqr.segResults[:0] } - i.i = 0 + return r } - if i.i >= len(i.sll) { - i.i = -1 - return i.Pull() + top := heap.Pop(&iqr.segResults) + sr := top.(*segResult) + r := iqr.copyTo(sr) + sr.i++ + if sr.i < len(sr.sll) { + heap.Push(&iqr.segResults, sr) } - r := &model.MeasureResult{ - SID: i.sll[i.i].ID, - Timestamps: []int64{i.timestamps[i.i]}, + return r +} + +func (iqr *indexSortResult) Release() {} + +func (iqr *indexSortResult) copyTo(src *segResult) (dest *model.MeasureResult) { + index := src.i + dest = &model.MeasureResult{ + SID: src.sll[index].ID, + Timestamps: []int64{src.timestamps[index]}, Versions: bypassVersions, } - for j := range i.tfl { - tagFamily := model.TagFamily{Name: i.tfl[j].name} - for name, offset := range i.tfl[j].projectedEntityOffsets { - tagFamily.Tags = append(tagFamily.Tags, model.Tag{ - Name: name, - Values: []*modelv1.TagValue{i.sll[i.i].EntityValues[offset]}, - }) + for i := range iqr.tfl { + tagFamily := model.TagFamily{Name: iqr.tfl[i].name} + peo := iqr.tfl[i].projectedEntityOffsets + var fr storage.FieldResult + if src.frl != nil { + fr = src.frl[index] } - if i.frl == nil { - continue - } - for f, v := range i.frl[j] { - if tnt, ok := i.tfl[j].fieldToValueType[f]; ok { + for _, n := range iqr.tfl[i].tagNames { + if offset, ok := peo[n]; ok { + tagFamily.Tags = append(tagFamily.Tags, model.Tag{ + Name: n, + Values: []*modelv1.TagValue{src.sll[index].EntityValues[offset]}, + }) + continue + } + if fr == nil { + continue + } + if tnt, ok := iqr.tfl[i].fieldToValueType[n]; ok { tagFamily.Tags = append(tagFamily.Tags, model.Tag{ - Name: tnt.name, - Values: []*modelv1.TagValue{mustDecodeTagValue(tnt.typ, v)}, + Name: n, + Values: []*modelv1.TagValue{mustDecodeTagValue(tnt.typ, fr[tnt.fieldName])}, }) } else { - return &model.MeasureResult{ - Error: errors.Errorf("unknown field %s not found in fieldToValueType", f), - } + logger.Panicf("unknown field %s not found in fieldToValueType", n) } } - r.TagFamilies = append(r.TagFamilies, tagFamily) + dest.TagFamilies = append(dest.TagFamilies, tagFamily) } - i.i++ - return r -} - -func (i *indexQueryResult) Release() { + return dest } type tagFamilyLocation struct { fieldToValueType map[string]tagNameWithType projectedEntityOffsets map[string]int name string + tagNames []string +} + +type segResult struct { + sll pbv1.SeriesList + frl storage.FieldResultList + timestamps []int64 + sortedValues [][]byte + i int +} + +type segResultHeap []*segResult + +func (h segResultHeap) Len() int { return len(h) } +func (h segResultHeap) Less(i, j int) bool { + return bytes.Compare(h[i].sortedValues[h[i].i], h[j].sortedValues[h[j].i]) < 0 +} +func (h segResultHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +func (h *segResultHeap) Push(x interface{}) { + *h = append(*h, x.(*segResult)) +} + +func (h *segResultHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x } diff --git a/banyand/stream/benchmark_test.go b/banyand/stream/benchmark_test.go index 0bb418e5..5aa0e10e 100644 --- a/banyand/stream/benchmark_test.go +++ b/banyand/stream/benchmark_test.go @@ -264,7 +264,7 @@ func generateStream(db storage.TSDB[*tsTable, option]) *stream { } } -func generateStreamQueryOptions(p parameter, index mockIndex) model.StreamQueryOptions { +func generateStreamQueryOptions(p parameter, midx mockIndex) model.StreamQueryOptions { timeRange := timestamp.TimeRange{ Start: time.Unix(int64(p.startTimestamp), 0), End: time.Unix(int64(p.endTimestamp), 0), @@ -287,7 +287,7 @@ func generateStreamQueryOptions(p parameter, index mockIndex) model.StreamQueryO num := generateRandomNumber(int64(p.tagCardinality)) value := filterTagValuePrefix + strconv.Itoa(num) filter := mockFilter{ - index: index, + index: midx, value: value, } indexRule := &databasev1.IndexRule{ @@ -297,7 +297,7 @@ func generateStreamQueryOptions(p parameter, index mockIndex) model.StreamQueryO Tags: []string{"filter-tag"}, Type: databasev1.IndexRule_TYPE_INVERTED, } - order := &model.OrderBy{ + order := &index.OrderBy{ Index: indexRule, Sort: modelv1.Sort_SORT_ASC, } diff --git a/pkg/index/index.go b/pkg/index/index.go index fc1b8f5c..88485605 100644 --- a/pkg/index/index.go +++ b/pkg/index/index.go @@ -174,7 +174,7 @@ type Writer interface { // FieldIterable allows building a FieldIterator. type FieldIterable interface { - BuildQuery(seriesMatchers []SeriesMatcher, secondaryQuery Query) (Query, error) + BuildQuery(seriesMatchers []SeriesMatcher, secondaryQuery Query, timeRange *timestamp.TimeRange) (Query, error) Iterator(ctx context.Context, fieldKey FieldKey, termRange RangeOpts, order modelv1.Sort, preLoadSize int) (iter FieldIterator[*DocumentResult], err error) Sort(ctx context.Context, sids []common.SeriesID, fieldKey FieldKey, @@ -206,11 +206,10 @@ type Store interface { // Series represents a series in an index. type Series struct { EntityValues []byte - ID common.SeriesID } func (s Series) String() string { - return fmt.Sprintf("%s:%d", s.EntityValues, s.ID) + return convert.BytesToString(s.EntityValues) } // SortedField returns the value of the sorted field. @@ -225,14 +224,33 @@ type SeriesDocument struct { Timestamp int64 } +// OrderByType is the type of order by. +type OrderByType int + +const ( + // OrderByTypeTime is the order by time. + OrderByTypeTime OrderByType = iota + // OrderByTypeIndex is the order by index. + OrderByTypeIndex + // OrderByTypeSeries is the order by series. + OrderByTypeSeries +) + +// OrderBy is the order by rule. +type OrderBy struct { + Index *databasev1.IndexRule + Sort modelv1.Sort + Type OrderByType +} + // SeriesStore is an abstract of a series repository. type SeriesStore interface { Store // Search returns a list of series that match the given matchers. Search(context.Context, []FieldKey, Query) ([]SeriesDocument, error) SeriesIterator(context.Context) (FieldIterator[Series], error) - SeriesSort(ctx context.Context, fieldKey FieldKey, termRange RangeOpts, order modelv1.Sort, - preLoadSize int, query Query, fieldKeys []FieldKey) (iter FieldIterator[*DocumentResult], err error) + SeriesSort(ctx context.Context, indexQuery Query, orderBy *OrderBy, + preLoadSize int, fieldKeys []FieldKey) (iter FieldIterator[*DocumentResult], err error) } // SeriesMatcherType represents the type of series matcher. diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go index 084ef729..4772f8b8 100644 --- a/pkg/index/inverted/inverted.go +++ b/pkg/index/inverted/inverted.go @@ -48,10 +48,8 @@ import ( const ( docIDField = "_id" batchSize = 1024 - seriesIDField = "series_id" - entityField = "entity" - idField = "id" - timestampField = "timestamp" + seriesIDField = "_series_id" + timestampField = "_timestamp" ) var ( @@ -317,22 +315,24 @@ func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list posti } type blugeMatchIterator struct { - delegated search.DocumentMatchIterator - err error - closer io.Closer - ctx *search.Context - current index.DocumentResult - hit int + delegated search.DocumentMatchIterator + err error + closer io.Closer + ctx *search.Context + loadDocValues []string + current index.DocumentResult + hit int } func newBlugeMatchIterator(delegated search.DocumentMatchIterator, closer io.Closer, - _ []string, + loadDocValues []string, ) blugeIterator { bmi := &blugeMatchIterator{ - delegated: delegated, - closer: closer, - current: index.DocumentResult{}, - ctx: search.NewSearchContext(1, 0), + delegated: delegated, + closer: closer, + current: index.DocumentResult{}, + ctx: search.NewSearchContext(1, 0), + loadDocValues: loadDocValues, } return bmi } @@ -359,9 +359,23 @@ func (bmi *blugeMatchIterator) Next() bool { if len(match.SortValue) > 0 { bmi.current.SortedValue = match.SortValue[0] } - err := match.VisitStoredFields(bmi.setVal) - bmi.err = multierr.Combine(bmi.err, err) - return bmi.err == nil + if len(bmi.loadDocValues) == 0 { + err := match.VisitStoredFields(bmi.setVal) + bmi.err = multierr.Combine(bmi.err, err) + return bmi.err == nil + } + if err := match.LoadDocumentValues(bmi.ctx, bmi.loadDocValues); err != nil { + bmi.err = multierr.Combine(bmi.err, err) + return false + } + for _, dv := range bmi.loadDocValues { + vv := match.DocValues(dv) + if len(vv) == 0 { + continue + } + bmi.setVal(dv, vv[0]) + } + return true } func (bmi *blugeMatchIterator) setVal(field string, value []byte) bool { diff --git a/pkg/index/inverted/inverted_series.go b/pkg/index/inverted/inverted_series.go index 9d0b15ed..59feb2c5 100644 --- a/pkg/index/inverted/inverted_series.go +++ b/pkg/index/inverted/inverted_series.go @@ -30,10 +30,10 @@ import ( "github.com/pkg/errors" "go.uber.org/multierr" - "github.com/apache/skywalking-banyandb/api/common" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/index" + "github.com/apache/skywalking-banyandb/pkg/timestamp" ) var emptySeries = make([]index.SeriesDocument, 0) @@ -49,12 +49,15 @@ func (s *store) SeriesBatch(batch index.Batch) error { b := generateBatch() defer releaseBatch(b) for _, d := range batch.Documents { - doc := bluge.NewDocument(convert.BytesToString(convert.Uint64ToBytes(d.DocID))) + doc := bluge.NewDocument(convert.BytesToString(d.EntityValues)) for _, f := range d.Fields { tf := bluge.NewKeywordFieldBytes(f.Key.Marshal(), f.Term) - if !f.NoSort { + if !f.Index { + tf.FieldOptions = 0 + } else if !f.NoSort { tf.Sortable() } + if f.Store { tf.StoreValue() } @@ -64,7 +67,6 @@ func (s *store) SeriesBatch(batch index.Batch) error { doc.AddField(tf) } - doc.AddField(bluge.NewKeywordFieldBytes(entityField, d.EntityValues).StoreValue()) if d.Timestamp > 0 { doc.AddField(bluge.NewDateTimeField(timestampField, time.Unix(0, d.Timestamp)).StoreValue()) } @@ -78,7 +80,7 @@ func (s *store) SeriesBatch(batch index.Batch) error { } // BuildQuery implements index.SeriesStore. -func (s *store) BuildQuery(seriesMatchers []index.SeriesMatcher, secondaryQuery index.Query) (index.Query, error) { +func (s *store) BuildQuery(seriesMatchers []index.SeriesMatcher, secondaryQuery index.Query, timeRange *timestamp.TimeRange) (index.Query, error) { if len(seriesMatchers) == 0 { return secondaryQuery, nil } @@ -90,19 +92,19 @@ func (s *store) BuildQuery(seriesMatchers []index.SeriesMatcher, secondaryQuery case index.SeriesMatcherTypeExact: match := convert.BytesToString(seriesMatchers[i].Match) q := bluge.NewTermQuery(match) - q.SetField(entityField) + q.SetField(docIDField) qs[i] = q nodes = append(nodes, newTermNode(match, nil)) case index.SeriesMatcherTypePrefix: match := convert.BytesToString(seriesMatchers[i].Match) q := bluge.NewPrefixQuery(match) - q.SetField(entityField) + q.SetField(docIDField) qs[i] = q nodes = append(nodes, newPrefixNode(match)) case index.SeriesMatcherTypeWildcard: match := convert.BytesToString(seriesMatchers[i].Match) q := bluge.NewWildcardQuery(match) - q.SetField(entityField) + q.SetField(docIDField) qs[i] = q nodes = append(nodes, newWildcardNode(match)) default: @@ -132,6 +134,12 @@ func (s *store) BuildQuery(seriesMatchers []index.SeriesMatcher, secondaryQuery query.AddMust(secondaryQuery.(*queryNode).query) node.Append(secondaryQuery.(*queryNode).node) } + if timeRange != nil { + q := bluge.NewDateRangeInclusiveQuery(timeRange.Start, timeRange.End, timeRange.IncludeStart, timeRange.IncludeEnd) + q.SetField(timestampField) + query.AddMust(q) + node.Append(newTimeRangeNode(timeRange)) + } return &queryNode{query, node}, nil } @@ -164,7 +172,6 @@ func parseResult(dmi search.DocumentMatchIterator, loadedFields []index.FieldKey if err != nil { return nil, errors.WithMessage(err, "iterate document match iterator") } - docIDMap := make(map[uint64]struct{}) fields := make([]string, 0, len(loadedFields)) for i := range loadedFields { fields = append(fields, loadedFields[i].Marshal()) @@ -179,16 +186,19 @@ func parseResult(dmi search.DocumentMatchIterator, loadedFields []index.FieldKey doc.Fields[fields[i]] = nil } } + var errTime error err = next.VisitStoredFields(func(field string, value []byte) bool { switch field { case docIDField: - id := convert.BytesToUint64(value) - if _, ok := docIDMap[id]; !ok { - doc.Key.ID = common.SeriesID(convert.BytesToUint64(value)) - docIDMap[id] = struct{}{} - } - case entityField: doc.Key.EntityValues = value + case timestampField: + var ts time.Time + ts, errTime = bluge.DecodeDateTime(value) + if errTime != nil { + err = errTime + return false + } + doc.Timestamp = ts.UnixNano() default: if _, ok := doc.Fields[field]; ok { doc.Fields[field] = bytes.Clone(value) @@ -196,10 +206,10 @@ func parseResult(dmi search.DocumentMatchIterator, loadedFields []index.FieldKey } return true }) - if err != nil { + if err = multierr.Combine(err, errTime); err != nil { return nil, errors.WithMessagef(err, "visit stored fields, hit: %d", hitNumber) } - if doc.Key.ID > 0 { + if len(doc.Key.EntityValues) > 0 { result = append(result, doc) } next, err = dmi.Next() @@ -210,58 +220,39 @@ func parseResult(dmi search.DocumentMatchIterator, loadedFields []index.FieldKey return result, nil } -func (s *store) SeriesSort(ctx context.Context, fieldKey index.FieldKey, termRange index.RangeOpts, order modelv1.Sort, - preLoadSize int, indexQuery index.Query, fieldKeys []index.FieldKey, +func (s *store) SeriesSort(ctx context.Context, indexQuery index.Query, orderBy *index.OrderBy, + preLoadSize int, fieldKeys []index.FieldKey, ) (iter index.FieldIterator[*index.DocumentResult], err error) { - if termRange.Lower != nil && - termRange.Upper != nil && - bytes.Compare(termRange.Lower, termRange.Upper) > 0 { - return index.DummyFieldIterator, nil + var sortedKey string + switch orderBy.Type { + case index.OrderByTypeTime: + sortedKey = timestampField + case index.OrderByTypeIndex: + fieldKey := index.FieldKey{ + IndexRuleID: orderBy.Index.Metadata.Id, + } + sortedKey = fieldKey.Marshal() + default: + return nil, errors.Errorf("unsupported order by type: %v", orderBy.Type) + } + if orderBy.Sort == modelv1.Sort_SORT_DESC { + sortedKey = "-" + sortedKey } + fields := make([]string, 0, len(fieldKeys)) + for i := range fieldKeys { + fields = append(fields, fieldKeys[i].Marshal()) + } + if !s.closer.AddRunning() { return nil, nil } - reader, err := s.writer.Reader() if err != nil { return nil, err } - fk := fieldKey.Marshal() - if termRange.Upper == nil { - termRange.Upper = defaultUpper - } - if termRange.Lower == nil { - termRange.Lower = defaultLower - } - rangeQuery := bluge.NewBooleanQuery() - rangeQuery.AddMust(bluge.NewTermRangeInclusiveQuery( - string(termRange.Lower), - string(termRange.Upper), - termRange.IncludesLower, - termRange.IncludesUpper, - ). - SetField(fk)) - rangeNode := newMustNode() - rangeNode.Append(newTermRangeInclusiveNode(string(termRange.Lower), string(termRange.Upper), termRange.IncludesLower, termRange.IncludesUpper, nil)) - - sortedKey := fk - if order == modelv1.Sort_SORT_DESC { - sortedKey = "-" + sortedKey - } - query := bluge.NewBooleanQuery().AddMust(rangeQuery) - node := newMustNode() - node.Append(rangeNode) - if indexQuery != nil && indexQuery.(*queryNode).query != nil { - query.AddMust(indexQuery.(*queryNode).query) - node.Append(indexQuery.(*queryNode).node) - } - fields := make([]string, 0, len(fieldKeys)) - for i := range fieldKeys { - fields = append(fields, fieldKeys[i].Marshal()) - } - result := &sortIterator{ - query: &queryNode{query, node}, + return &sortIterator{ + query: indexQuery, fields: fields, reader: reader, sortedKey: sortedKey, @@ -269,13 +260,11 @@ func (s *store) SeriesSort(ctx context.Context, fieldKey index.FieldKey, termRan closer: s.closer, ctx: ctx, newIterator: newSeriesIterator, - } - return result, nil + }, nil } type seriesIterator struct { *blugeMatchIterator - needToLoadFields []string } func newSeriesIterator(delegated search.DocumentMatchIterator, closer io.Closer, @@ -288,7 +277,6 @@ func newSeriesIterator(delegated search.DocumentMatchIterator, closer io.Closer, ctx: search.NewSearchContext(1, 0), current: index.DocumentResult{Values: make(map[string][]byte, len(needToLoadFields))}, }, - needToLoadFields: append(needToLoadFields, entityField, docIDField, seriesIDField, timestampField), } for _, f := range needToLoadFields { si.current.Values[f] = nil @@ -312,7 +300,6 @@ func (si *seriesIterator) Next() bool { si.current.Values[i] = nil } si.current.DocID = 0 - si.current.SeriesID = 0 si.current.Timestamp = 0 si.current.SortedValue = nil if len(match.SortValue) > 0 { @@ -329,10 +316,8 @@ func (si *seriesIterator) Next() bool { func (si *seriesIterator) setVal(field string, value []byte) bool { switch field { - case entityField: - si.current.EntityValues = value case docIDField: - si.current.DocID = convert.BytesToUint64(value) + si.current.EntityValues = value case timestampField: ts, errTime := bluge.DecodeDateTime(value) if errTime != nil { @@ -356,7 +341,7 @@ func (s *store) SeriesIterator(ctx context.Context) (index.FieldIterator[index.S defer func() { _ = reader.Close() }() - dict, err := reader.DictionaryIterator(entityField, nil, nil, nil) + dict, err := reader.DictionaryIterator(docIDField, nil, nil, nil) if err != nil { return nil, err } diff --git a/pkg/index/inverted/inverted_series_test.go b/pkg/index/inverted/inverted_series_test.go index cd0a3bd4..ac546ca9 100644 --- a/pkg/index/inverted/inverted_series_test.go +++ b/pkg/index/inverted/inverted_series_test.go @@ -19,15 +19,22 @@ package inverted import ( "context" + "maps" "testing" + "time" + "github.com/blugelabs/bluge" + "github.com/blugelabs/bluge/numeric" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/apache/skywalking-banyandb/api/common" + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/timestamp" ) var ( @@ -37,6 +44,9 @@ var ( fieldKeyServiceName = index.FieldKey{ IndexRuleID: 6, } + fieldKeyStartTime = index.FieldKey{ + IndexRuleID: 21, + } ) func TestStore_Search(t *testing.T) { @@ -66,7 +76,6 @@ func TestStore_Search(t *testing.T) { want: []index.SeriesDocument{ { Key: index.Series{ - ID: common.SeriesID(1), EntityValues: []byte("test1"), }, }, @@ -78,33 +87,32 @@ func TestStore_Search(t *testing.T) { want: []index.SeriesDocument{ { Key: index.Series{ - ID: common.SeriesID(2), EntityValues: []byte("test2"), }, Fields: map[string][]byte{ fieldKeyDuration.Marshal(): convert.Int64ToBytes(int64(100)), fieldKeyServiceName.Marshal(): []byte("svc2"), }, + Timestamp: int64(101), }, { Key: index.Series{ - ID: common.SeriesID(3), - EntityValues: []byte("test3"), + EntityValues: []byte("test1"), }, Fields: map[string][]byte{ - fieldKeyDuration.Marshal(): convert.Int64ToBytes(int64(500)), + fieldKeyDuration.Marshal(): nil, fieldKeyServiceName.Marshal(): nil, }, }, { Key: index.Series{ - ID: common.SeriesID(1), - EntityValues: []byte("test1"), + EntityValues: []byte("test3"), }, Fields: map[string][]byte{ - fieldKeyDuration.Marshal(): nil, + fieldKeyDuration.Marshal(): convert.Int64ToBytes(int64(500)), fieldKeyServiceName.Marshal(): nil, }, + Timestamp: int64(1001), }, }, }, @@ -114,30 +122,30 @@ func TestStore_Search(t *testing.T) { want: []index.SeriesDocument{ { Key: index.Series{ - ID: common.SeriesID(2), EntityValues: []byte("test2"), }, Fields: map[string][]byte{ fieldKeyDuration.Marshal(): convert.Int64ToBytes(int64(100)), }, + Timestamp: int64(101), }, + { Key: index.Series{ - ID: common.SeriesID(3), - EntityValues: []byte("test3"), + EntityValues: []byte("test1"), }, Fields: map[string][]byte{ - fieldKeyDuration.Marshal(): convert.Int64ToBytes(int64(500)), + fieldKeyDuration.Marshal(): nil, }, }, { Key: index.Series{ - ID: common.SeriesID(1), - EntityValues: []byte("test1"), + EntityValues: []byte("test3"), }, Fields: map[string][]byte{ - fieldKeyDuration.Marshal(): nil, + fieldKeyDuration.Marshal(): convert.Int64ToBytes(int64(500)), }, + Timestamp: int64(1001), }, }, }, @@ -147,17 +155,16 @@ func TestStore_Search(t *testing.T) { want: []index.SeriesDocument{ { Key: index.Series{ - ID: common.SeriesID(2), EntityValues: []byte("test2"), }, Fields: map[string][]byte{ fieldKeyServiceName.Marshal(): []byte("svc2"), }, + Timestamp: int64(101), }, { Key: index.Series{ - ID: common.SeriesID(3), - EntityValues: []byte("test3"), + EntityValues: []byte("test1"), }, Fields: map[string][]byte{ fieldKeyServiceName.Marshal(): nil, @@ -165,12 +172,12 @@ func TestStore_Search(t *testing.T) { }, { Key: index.Series{ - ID: common.SeriesID(1), - EntityValues: []byte("test1"), + EntityValues: []byte("test3"), }, Fields: map[string][]byte{ fieldKeyServiceName.Marshal(): nil, }, + Timestamp: int64(1001), }, }, }, @@ -191,7 +198,7 @@ func TestStore_Search(t *testing.T) { name += string(term) + "-" } t.Run(name, func(t *testing.T) { - query, err := s.BuildQuery(matchers, nil) + query, err := s.BuildQuery(matchers, nil, nil) require.NotEmpty(t, query.String()) require.NoError(t, err) got, err := s.Search(context.Background(), tt.projection, query) @@ -228,21 +235,26 @@ func TestStore_SearchWildcard(t *testing.T) { want: []index.SeriesDocument{ { Key: index.Series{ - ID: common.SeriesID(1), EntityValues: []byte("test1"), }, }, { Key: index.Series{ - ID: common.SeriesID(2), EntityValues: []byte("test2"), }, + Timestamp: int64(101), }, { Key: index.Series{ - ID: common.SeriesID(3), EntityValues: []byte("test3"), }, + Timestamp: int64(1001), + }, + { + Key: index.Series{ + EntityValues: []byte("test4"), + }, + Timestamp: int64(2001), }, }, }, @@ -251,9 +263,9 @@ func TestStore_SearchWildcard(t *testing.T) { want: []index.SeriesDocument{ { Key: index.Series{ - ID: common.SeriesID(2), EntityValues: []byte("test2"), }, + Timestamp: int64(101), }, }, }, @@ -262,7 +274,6 @@ func TestStore_SearchWildcard(t *testing.T) { want: []index.SeriesDocument{ { Key: index.Series{ - ID: common.SeriesID(1), EntityValues: []byte("test1"), }, }, @@ -281,7 +292,7 @@ func TestStore_SearchWildcard(t *testing.T) { Type: index.SeriesMatcherTypeWildcard, Match: tt.wildcard, }, - }, nil) + }, nil, nil) require.NoError(t, err) require.NotEmpty(t, query.String()) got, err := s.Search(context.Background(), tt.projection, query) @@ -318,21 +329,26 @@ func TestStore_SearchPrefix(t *testing.T) { want: []index.SeriesDocument{ { Key: index.Series{ - ID: common.SeriesID(1), EntityValues: []byte("test1"), }, }, { Key: index.Series{ - ID: common.SeriesID(2), EntityValues: []byte("test2"), }, + Timestamp: int64(101), }, { Key: index.Series{ - ID: common.SeriesID(3), EntityValues: []byte("test3"), }, + Timestamp: int64(1001), + }, + { + Key: index.Series{ + EntityValues: []byte("test4"), + }, + Timestamp: int64(2001), }, }, }, @@ -349,7 +365,7 @@ func TestStore_SearchPrefix(t *testing.T) { Type: index.SeriesMatcherTypePrefix, Match: tt.prefix, }, - }, nil) + }, nil, nil) require.NoError(t, err) require.NotEmpty(t, query.String()) got, err := s.Search(context.Background(), tt.projection, query) @@ -359,41 +375,600 @@ func TestStore_SearchPrefix(t *testing.T) { } } +func TestStore_SearchWithSecondaryQuery(t *testing.T) { + tester := require.New(t) + path, fn := setUp(tester) + s, err := NewStore(StoreOpts{ + Path: path, + Logger: logger.GetLogger("test"), + }) + tester.NoError(err) + defer func() { + tester.NoError(s.Close()) + fn() + }() + + // Setup some data + setupData(tester, s) + + // Define the secondary query + secondaryQuery := &queryNode{ + query: bluge.NewTermQuery("svc2").SetField(fieldKeyServiceName.Marshal()), + node: newTermNode("svc2", nil), + } + + // Test cases + tests := []struct { + term [][]byte + want []index.SeriesDocument + projection []index.FieldKey + }{ + { + term: [][]byte{[]byte("test2")}, + projection: []index.FieldKey{fieldKeyServiceName, fieldKeyDuration, {TagName: "short_name"}}, + want: []index.SeriesDocument{ + { + Key: index.Series{ + EntityValues: []byte("test2"), + }, + Fields: map[string][]byte{ + fieldKeyDuration.Marshal(): convert.Int64ToBytes(int64(100)), + fieldKeyServiceName.Marshal(): []byte("svc2"), + "short_name": []byte("t2"), + }, + Timestamp: int64(101), + }, + }, + }, + { + term: [][]byte{[]byte("test3")}, + projection: []index.FieldKey{fieldKeyServiceName, fieldKeyDuration}, + want: []index.SeriesDocument{}, + }, + { + term: [][]byte{[]byte("test1")}, + projection: []index.FieldKey{fieldKeyServiceName, fieldKeyDuration}, + want: []index.SeriesDocument{}, + }, + { + term: [][]byte{[]byte("test2"), []byte("test3")}, + projection: []index.FieldKey{fieldKeyServiceName, fieldKeyDuration, {TagName: "short_name"}}, + want: []index.SeriesDocument{ + { + Key: index.Series{ + EntityValues: []byte("test2"), + }, + Fields: map[string][]byte{ + fieldKeyDuration.Marshal(): convert.Int64ToBytes(int64(100)), + fieldKeyServiceName.Marshal(): []byte("svc2"), + "short_name": []byte("t2"), + }, + Timestamp: int64(101), + }, + }, + }, + { + term: [][]byte{[]byte("test1"), []byte("test2")}, + projection: []index.FieldKey{fieldKeyServiceName, fieldKeyDuration}, + want: []index.SeriesDocument{ + { + Key: index.Series{ + EntityValues: []byte("test2"), + }, + Fields: map[string][]byte{ + fieldKeyDuration.Marshal(): convert.Int64ToBytes(int64(100)), + fieldKeyServiceName.Marshal(): []byte("svc2"), + }, + Timestamp: int64(101), + }, + }, + }, + } + + for _, tt := range tests { + var matchers []index.SeriesMatcher + var name string + for _, term := range tt.term { + matchers = append(matchers, index.SeriesMatcher{ + Type: index.SeriesMatcherTypeExact, + Match: term, + }) + name += string(term) + "-" + } + t.Run(name, func(t *testing.T) { + query, err := s.BuildQuery(matchers, secondaryQuery, nil) + require.NotEmpty(t, query.String()) + require.NoError(t, err) + got, err := s.Search(context.Background(), tt.projection, query) + require.NoError(t, err) + assert.Equal(t, tt.want, got) + }) + } +} + +func TestStore_SeriesSort(t *testing.T) { + tester := require.New(t) + path, fn := setUp(tester) + s, err := NewStore(StoreOpts{ + Path: path, + Logger: logger.GetLogger("test"), + }) + tester.NoError(err) + defer func() { + tester.NoError(s.Close()) + fn() + }() + + // Setup some data + setupData(tester, s) + + // Define the order by field + orderBy := &index.OrderBy{ + Index: &databasev1.IndexRule{ + Metadata: &commonv1.Metadata{ + Id: fieldKeyStartTime.IndexRuleID, + }, + }, + Sort: modelv1.Sort_SORT_ASC, + Type: index.OrderByTypeIndex, + } + + // Test cases + tests := []struct { + name string + orderBy *index.OrderBy + timeRange *timestamp.TimeRange + want []index.DocumentResult + fieldKeys []index.FieldKey + }{ + { + name: "Sort by start_time ascending", + orderBy: orderBy, + fieldKeys: []index.FieldKey{fieldKeyStartTime}, + want: []index.DocumentResult{ + { + EntityValues: []byte("test2"), + Values: map[string][]byte{ + fieldKeyStartTime.Marshal(): convert.Int64ToBytes(int64(100)), + }, + SortedValue: convert.Int64ToBytes(int64(100)), + }, + { + EntityValues: []byte("test3"), + Values: map[string][]byte{ + fieldKeyStartTime.Marshal(): convert.Int64ToBytes(int64(1000)), + }, + SortedValue: convert.Int64ToBytes(int64(1000)), + }, + { + EntityValues: []byte("test4"), + Values: map[string][]byte{ + fieldKeyStartTime.Marshal(): convert.Int64ToBytes(int64(2000)), + }, + SortedValue: convert.Int64ToBytes(int64(2000)), + }, + { + EntityValues: []byte("test1"), + Values: map[string][]byte{ + fieldKeyStartTime.Marshal(): nil, + }, + SortedValue: []byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}, + }, + }, + }, + { + name: "Sort by start_time descending", + orderBy: &index.OrderBy{ + Index: &databasev1.IndexRule{ + Metadata: &commonv1.Metadata{ + Id: fieldKeyStartTime.IndexRuleID, + }, + }, + Sort: modelv1.Sort_SORT_DESC, + Type: index.OrderByTypeIndex, + }, + fieldKeys: []index.FieldKey{fieldKeyStartTime}, + want: []index.DocumentResult{ + { + EntityValues: []byte("test4"), + Values: map[string][]byte{ + fieldKeyStartTime.Marshal(): convert.Int64ToBytes(int64(2000)), + }, + SortedValue: convert.Int64ToBytes(int64(2000)), + }, + { + EntityValues: []byte("test3"), + Values: map[string][]byte{ + fieldKeyStartTime.Marshal(): convert.Int64ToBytes(int64(1000)), + }, + SortedValue: convert.Int64ToBytes(int64(1000)), + }, + { + EntityValues: []byte("test2"), + Values: map[string][]byte{ + fieldKeyStartTime.Marshal(): convert.Int64ToBytes(int64(100)), + }, + SortedValue: convert.Int64ToBytes(int64(100)), + }, + { + EntityValues: []byte("test1"), + Values: map[string][]byte{ + fieldKeyStartTime.Marshal(): nil, + }, + SortedValue: []byte{0x00}, + }, + }, + }, + { + name: "Sort by start_time ascending with time range 100 to 1000", + orderBy: orderBy, + fieldKeys: []index.FieldKey{fieldKeyStartTime}, + timeRange: func() *timestamp.TimeRange { + tr := timestamp.NewInclusiveTimeRange(time.Unix(0, 100), time.Unix(0, 1000)) + return &tr + }(), + want: []index.DocumentResult{ + { + EntityValues: []byte("test2"), + Values: map[string][]byte{ + fieldKeyStartTime.Marshal(): convert.Int64ToBytes(int64(100)), + }, + SortedValue: convert.Int64ToBytes(int64(100)), + }, + { + EntityValues: []byte("test3"), + Values: map[string][]byte{ + fieldKeyStartTime.Marshal(): convert.Int64ToBytes(int64(1000)), + }, + SortedValue: convert.Int64ToBytes(int64(1000)), + }, + }, + }, + { + name: "Sort by start_time ascending with time range 0 to 2000", + orderBy: orderBy, + fieldKeys: []index.FieldKey{fieldKeyStartTime}, + timeRange: func() *timestamp.TimeRange { + tr := timestamp.NewInclusiveTimeRange(time.Unix(0, 0), time.Unix(0, 2000)) + return &tr + }(), + want: []index.DocumentResult{ + { + EntityValues: []byte("test2"), + Values: map[string][]byte{ + fieldKeyStartTime.Marshal(): convert.Int64ToBytes(int64(100)), + }, + SortedValue: convert.Int64ToBytes(int64(100)), + }, + { + EntityValues: []byte("test3"), + Values: map[string][]byte{ + fieldKeyStartTime.Marshal(): convert.Int64ToBytes(int64(1000)), + }, + SortedValue: convert.Int64ToBytes(int64(1000)), + }, + { + EntityValues: []byte("test4"), + Values: map[string][]byte{ + fieldKeyStartTime.Marshal(): convert.Int64ToBytes(int64(2000)), + }, + SortedValue: convert.Int64ToBytes(int64(2000)), + }, + }, + }, + { + name: "Sort by start_time ascending with time range 500 to 1500", + orderBy: orderBy, + fieldKeys: []index.FieldKey{fieldKeyStartTime}, + timeRange: func() *timestamp.TimeRange { + tr := timestamp.NewInclusiveTimeRange(time.Unix(0, 500), time.Unix(0, 1500)) + return &tr + }(), + want: []index.DocumentResult{ + { + EntityValues: []byte("test3"), + Values: map[string][]byte{ + fieldKeyStartTime.Marshal(): convert.Int64ToBytes(int64(1000)), + }, + SortedValue: convert.Int64ToBytes(int64(1000)), + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var secondaryQuery index.Query + if tt.timeRange != nil { + secondaryQuery = &queryNode{ + query: bluge.NewTermRangeInclusiveQuery( + string(convert.Int64ToBytes(tt.timeRange.Start.Local().UnixNano())), + string(convert.Int64ToBytes(tt.timeRange.End.Local().UnixNano())), + tt.timeRange.IncludeStart, + tt.timeRange.IncludeEnd, + ).SetField(fieldKeyStartTime.Marshal()), + } + } + query, err := s.BuildQuery([]index.SeriesMatcher{ + { + Type: index.SeriesMatcherTypePrefix, + Match: []byte("test"), + }, + }, secondaryQuery, nil) + require.NoError(t, err) + iter, err := s.SeriesSort(context.Background(), query, tt.orderBy, 10, tt.fieldKeys) + require.NoError(t, err) + defer iter.Close() + + var got []index.DocumentResult + for iter.Next() { + var g index.DocumentResult + val := iter.Val() + g.DocID = val.DocID + g.EntityValues = val.EntityValues + g.Values = maps.Clone(val.Values) + g.SortedValue = val.SortedValue + got = append(got, g) + } + assert.Equal(t, tt.want, got) + }) + } +} + +func TestStore_TimestampSort(t *testing.T) { + tester := require.New(t) + path, fn := setUp(tester) + s, err := NewStore(StoreOpts{ + Path: path, + Logger: logger.GetLogger("test"), + }) + tester.NoError(err) + defer func() { + tester.NoError(s.Close()) + fn() + }() + + // Setup some data + setupData(tester, s) + + // Define the order by field + orderBy := &index.OrderBy{ + Type: index.OrderByTypeTime, + Sort: modelv1.Sort_SORT_ASC, + } + + // Test cases + tests := []struct { + name string + orderBy *index.OrderBy + timeRange *timestamp.TimeRange + want []index.DocumentResult + fieldKeys []index.FieldKey + }{ + { + name: "Sort by timestamp ascending", + orderBy: orderBy, + want: []index.DocumentResult{ + { + EntityValues: []byte("test2"), + Timestamp: int64(101), + SortedValue: numeric.MustNewPrefixCodedInt64(101, 0), + }, + { + EntityValues: []byte("test3"), + Timestamp: int64(1001), + SortedValue: numeric.MustNewPrefixCodedInt64(1001, 0), + }, + { + EntityValues: []byte("test4"), + Timestamp: int64(2001), + SortedValue: numeric.MustNewPrefixCodedInt64(2001, 0), + }, + { + EntityValues: []byte("test1"), + Timestamp: 0, + SortedValue: []byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}, + }, + }, + }, + { + name: "Sort by timestamp descending", + orderBy: &index.OrderBy{ + Type: index.OrderByTypeTime, + Sort: modelv1.Sort_SORT_DESC, + }, + want: []index.DocumentResult{ + { + EntityValues: []byte("test4"), + Timestamp: int64(2001), + SortedValue: numeric.MustNewPrefixCodedInt64(2001, 0), + }, + { + EntityValues: []byte("test3"), + Timestamp: int64(1001), + SortedValue: numeric.MustNewPrefixCodedInt64(1001, 0), + }, + { + EntityValues: []byte("test2"), + Timestamp: int64(101), + SortedValue: numeric.MustNewPrefixCodedInt64(101, 0), + }, + { + EntityValues: []byte("test1"), + Timestamp: 0, + SortedValue: []byte{0x00}, + }, + }, + }, + { + name: "Sort by timestamp ascending with time range 100 to 1000", + orderBy: orderBy, + timeRange: func() *timestamp.TimeRange { + tr := timestamp.NewInclusiveTimeRange(time.Unix(0, 100), time.Unix(0, 1000)) + return &tr + }(), + want: []index.DocumentResult{ + { + EntityValues: []byte("test2"), + Timestamp: int64(101), + SortedValue: numeric.MustNewPrefixCodedInt64(101, 0), + }, + }, + }, + { + name: "Sort by timestamp ascending with time range 0 to 2000", + orderBy: orderBy, + timeRange: func() *timestamp.TimeRange { + tr := timestamp.NewInclusiveTimeRange(time.Unix(0, 0), time.Unix(0, 2000)) + return &tr + }(), + want: []index.DocumentResult{ + { + EntityValues: []byte("test2"), + Timestamp: int64(101), + SortedValue: numeric.MustNewPrefixCodedInt64(101, 0), + }, + { + EntityValues: []byte("test3"), + Timestamp: int64(1001), + SortedValue: numeric.MustNewPrefixCodedInt64(1001, 0), + }, + }, + }, + { + name: "Sort by timestamp ascending with time range 500 to 1500", + orderBy: orderBy, + timeRange: func() *timestamp.TimeRange { + tr := timestamp.NewInclusiveTimeRange(time.Unix(0, 500), time.Unix(0, 1500)) + return &tr + }(), + want: []index.DocumentResult{ + { + EntityValues: []byte("test3"), + Timestamp: int64(1001), + SortedValue: numeric.MustNewPrefixCodedInt64(1001, 0), + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + query, err := s.BuildQuery([]index.SeriesMatcher{ + { + Type: index.SeriesMatcherTypePrefix, + Match: []byte("test"), + }, + }, nil, tt.timeRange) + require.NoError(t, err) + iter, err := s.SeriesSort(context.Background(), query, tt.orderBy, 10, tt.fieldKeys) + require.NoError(t, err) + defer iter.Close() + + var got []index.DocumentResult + for iter.Next() { + var g index.DocumentResult + val := iter.Val() + g.DocID = val.DocID + g.EntityValues = val.EntityValues + if len(val.Values) > 0 { + g.Values = maps.Clone(val.Values) + } + g.SortedValue = val.SortedValue + g.Timestamp = val.Timestamp + got = append(got, g) + } + assert.Equal(t, tt.want, got) + }) + } +} + func setupData(tester *require.Assertions, s index.SeriesStore) { series1 := index.Document{ - DocID: 1, EntityValues: []byte("test1"), } series2 := index.Document{ - DocID: 2, EntityValues: []byte("test2"), Fields: []index.Field{ { Key: fieldKeyDuration, Term: convert.Int64ToBytes(int64(100)), Store: true, + Index: true, }, { Key: fieldKeyServiceName, Term: []byte("svc2"), Store: true, + Index: true, + }, + { + Key: fieldKeyStartTime, + Term: convert.Int64ToBytes(int64(100)), + Store: true, + Index: true, + }, + { + Key: index.FieldKey{ + TagName: "short_name", + }, + Term: []byte("t2"), + Store: true, + Index: false, }, }, + Timestamp: int64(101), } series3 := index.Document{ - DocID: 3, EntityValues: []byte("test3"), Fields: []index.Field{ { Key: fieldKeyDuration, Term: convert.Int64ToBytes(int64(500)), Store: true, + Index: true, + }, + { + Key: fieldKeyStartTime, + Term: convert.Int64ToBytes(int64(1000)), + Store: true, + Index: true, + }, + { + Key: index.FieldKey{ + TagName: "short_name", + }, + Term: []byte("t3"), + Store: true, + Index: false, }, }, + Timestamp: int64(1001), } + series4 := index.Document{ + EntityValues: []byte("test4"), + Fields: []index.Field{ + { + Key: fieldKeyDuration, + Term: convert.Int64ToBytes(int64(500)), + Store: true, + Index: true, + }, + { + Key: fieldKeyStartTime, + Term: convert.Int64ToBytes(int64(2000)), + Store: true, + Index: true, + }, + }, + Timestamp: int64(2001), + } + tester.NoError(s.SeriesBatch(index.Batch{ + Documents: []index.Document{series1, series2, series4, series3}, + })) tester.NoError(s.SeriesBatch(index.Batch{ - Documents: []index.Document{series1, series2, series3, series3}, + Documents: []index.Document{series3}, })) } diff --git a/pkg/index/inverted/query.go b/pkg/index/inverted/query.go index f4487352..2dc63093 100644 --- a/pkg/index/inverted/query.go +++ b/pkg/index/inverted/query.go @@ -31,6 +31,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/query/logical" + "github.com/apache/skywalking-banyandb/pkg/timestamp" ) var ( @@ -466,3 +467,23 @@ func (m *wildcardNode) MarshalJSON() ([]byte, error) { func (m *wildcardNode) String() string { return convert.JSONToString(m) } + +type timeRangeNode struct { + timeRange *timestamp.TimeRange +} + +func newTimeRangeNode(timeRange *timestamp.TimeRange) *timeRangeNode { + return &timeRangeNode{ + timeRange: timeRange, + } +} + +func (t *timeRangeNode) MarshalJSON() ([]byte, error) { + data := make(map[string]interface{}, 1) + data["time_range"] = t.timeRange.String() + return json.Marshal(data) +} + +func (t *timeRangeNode) String() string { + return convert.JSONToString(t) +} diff --git a/pkg/pb/v1/series.go b/pkg/pb/v1/series.go index a0e76827..542343d9 100644 --- a/pkg/pb/v1/series.go +++ b/pkg/pb/v1/series.go @@ -72,6 +72,7 @@ func (s *Series) MarshalWithWildcard() error { // Unmarshal decodes series from internal Buffer. func (s *Series) Unmarshal(src []byte) error { + s.ID = common.SeriesID(convert.Hash(src)) var err error s.Buffer = s.Buffer[:0] if s.Buffer, src, err = unmarshalEntityValue(s.Buffer, src); err != nil { diff --git a/pkg/query/logical/measure/measure_plan_indexscan_local.go b/pkg/query/logical/measure/measure_plan_indexscan_local.go index bf11e111..4e2745f8 100644 --- a/pkg/query/logical/measure/measure_plan_indexscan_local.go +++ b/pkg/query/logical/measure/measure_plan_indexscan_local.go @@ -135,29 +135,33 @@ func (i *localIndexScan) Sort(order *logical.OrderBy) { } func (i *localIndexScan) Execute(ctx context.Context) (mit executor.MIterator, err error) { - var orderBy *model.OrderBy - orderByType := model.OrderByTypeTime + var orderBy *index.OrderBy + if i.order != nil { - if i.order.Index != nil { - orderByType = model.OrderByTypeIndex - } - orderBy = &model.OrderBy{ - Index: i.order.Index, + orderBy = &index.OrderBy{ Sort: i.order.Sort, + Index: i.order.Index, + } + if orderBy.Index == nil { + orderBy.Type = index.OrderByTypeTime + } else { + orderBy.Type = index.OrderByTypeIndex } } if i.groupByEntity { - orderByType = model.OrderByTypeSeries + if orderBy == nil { + orderBy = &index.OrderBy{} + } + orderBy.Type = index.OrderByTypeSeries } ec := executor.FromMeasureExecutionContext(ctx) - ctx, stop := i.startSpan(ctx, query.GetTracer(ctx), orderByType, orderBy) + ctx, stop := i.startSpan(ctx, query.GetTracer(ctx), orderBy) defer stop(err) result, err := ec.Query(ctx, model.MeasureQueryOptions{ Name: i.metadata.GetName(), TimeRange: &i.timeRange, Entities: i.entities, Query: i.query, - OrderByType: orderByType, Order: orderBy, TagProjection: i.projectionTags, FieldProjection: i.projectionFields, @@ -269,20 +273,24 @@ func (ei *resultMIterator) Close() error { return ei.err } -func (i *localIndexScan) startSpan(ctx context.Context, tracer *query.Tracer, orderType model.OrderByType, orderBy *model.OrderBy) (context.Context, func(error)) { +func (i *localIndexScan) startSpan(ctx context.Context, tracer *query.Tracer, orderBy *index.OrderBy) (context.Context, func(error)) { if tracer == nil { return ctx, func(error) {} } span, ctx := tracer.StartSpan(ctx, "indexScan-%s", i.metadata) - sortName := modelv1.Sort_name[int32(orderBy.Sort)] - switch orderType { - case model.OrderByTypeTime: - span.Tag("orderBy", "time "+sortName) - case model.OrderByTypeIndex: - span.Tag("orderBy", fmt.Sprintf("indexRule:%s", orderBy.Index.Metadata.Name)) - case model.OrderByTypeSeries: - span.Tag("orderBy", "series") + if orderBy != nil { + sortName := modelv1.Sort_name[int32(orderBy.Sort)] + switch orderBy.Type { + case index.OrderByTypeTime: + span.Tag("orderBy", "time-"+sortName) + case index.OrderByTypeIndex: + span.Tag("orderBy", fmt.Sprintf("indexRule:%s-%s", orderBy.Index.Metadata.Name, sortName)) + case index.OrderByTypeSeries: + span.Tag("orderBy", "series") + } + } else { + span.Tag("orderBy", "time-asc(default)") } span.Tag("details", i.String()) diff --git a/pkg/query/logical/measure/topn_plan_localscan.go b/pkg/query/logical/measure/topn_plan_localscan.go index f068c156..9608af95 100644 --- a/pkg/query/logical/measure/topn_plan_localscan.go +++ b/pkg/query/logical/measure/topn_plan_localscan.go @@ -27,6 +27,7 @@ import ( commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/query/executor" @@ -158,7 +159,7 @@ func (i *localScan) Execute(ctx context.Context) (mit executor.MIterator, err er Name: i.metadata.GetName(), TimeRange: &i.timeRange, Entities: [][]*modelv1.TagValue{i.entity}, - Order: &model.OrderBy{Sort: i.sort}, + Order: &index.OrderBy{Sort: i.sort}, TagProjection: i.projectionTags, FieldProjection: i.projectionFields, }) diff --git a/pkg/query/logical/optimizer.go b/pkg/query/logical/optimizer.go index a087cdbc..471f78b8 100644 --- a/pkg/query/logical/optimizer.go +++ b/pkg/query/logical/optimizer.go @@ -74,7 +74,8 @@ func NewPushDownOrder(order *modelv1.QueryOrder) PushDownOrder { // Optimize a Plan by pushing down the query order. func (pdo PushDownOrder) Optimize(plan Plan) (Plan, error) { if v, ok := plan.(Sorter); ok { - if order, err := ParseOrderBy(v.Schema(), pdo.order.GetIndexRuleName(), pdo.order.GetSort()); err == nil { + if order, err := ParseOrderBy(v.Schema(), + pdo.order.GetIndexRuleName(), pdo.order.GetSort()); err == nil && order != nil { v.Sort(order) } else { return nil, err diff --git a/pkg/query/logical/plan.go b/pkg/query/logical/plan.go index 5417b6d8..4d30a1f9 100644 --- a/pkg/query/logical/plan.go +++ b/pkg/query/logical/plan.go @@ -63,6 +63,9 @@ func (o *OrderBy) String() string { // ParseOrderBy parses an OrderBy from a Schema. func ParseOrderBy(s Schema, indexRuleName string, sort modelv1.Sort) (*OrderBy, error) { if indexRuleName == "" { + if sort == modelv1.Sort_SORT_UNSPECIFIED { + return nil, nil + } return &OrderBy{ Sort: sort, }, nil diff --git a/pkg/query/logical/stream/stream_plan_indexscan_local.go b/pkg/query/logical/stream/stream_plan_indexscan_local.go index f7db768a..eda67111 100644 --- a/pkg/query/logical/stream/stream_plan_indexscan_local.go +++ b/pkg/query/logical/stream/stream_plan_indexscan_local.go @@ -81,9 +81,9 @@ func (i *localIndexScan) Execute(ctx context.Context) ([]*streamv1.Element, erro if i.result != nil { return BuildElementsFromStreamResult(ctx, i.result), nil } - var orderBy *model.OrderBy + var orderBy *index.OrderBy if i.order != nil { - orderBy = &model.OrderBy{ + orderBy = &index.OrderBy{ Index: i.order.Index, Sort: i.order.Sort, } diff --git a/pkg/query/model/model.go b/pkg/query/model/model.go index 022bcdbc..768e6fad 100644 --- a/pkg/query/model/model.go +++ b/pkg/query/model/model.go @@ -22,7 +22,6 @@ import ( "context" "github.com/apache/skywalking-banyandb/api/common" - databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/timestamp" @@ -52,34 +51,15 @@ type TagProjection struct { Names []string } -// OrderBy is the order by rule. -type OrderBy struct { - Index *databasev1.IndexRule - Sort modelv1.Sort -} - -// OrderByType is the type of order by. -type OrderByType int - -const ( - // OrderByTypeTime is the order by time. - OrderByTypeTime OrderByType = iota - // OrderByTypeIndex is the order by index. - OrderByTypeIndex - // OrderByTypeSeries is the order by series. - OrderByTypeSeries -) - // MeasureQueryOptions is the options of a measure query. type MeasureQueryOptions struct { Query index.Query TimeRange *timestamp.TimeRange - Order *OrderBy + Order *index.OrderBy Name string Entities [][]*modelv1.TagValue TagProjection []TagProjection FieldProjection []string - OrderByType OrderByType } // MeasureResult is the result of a query. @@ -104,7 +84,7 @@ type StreamQueryOptions struct { TimeRange *timestamp.TimeRange Entities [][]*modelv1.TagValue Filter index.Filter - Order *OrderBy + Order *index.OrderBy TagProjection []TagProjection MaxElementSize int } diff --git a/pkg/test/measure/testdata/measures/service_traffic.json b/pkg/test/measure/testdata/measures/service_traffic.json index 7edc682c..887d1adb 100644 --- a/pkg/test/measure/testdata/measures/service_traffic.json +++ b/pkg/test/measure/testdata/measures/service_traffic.json @@ -39,5 +39,6 @@ "id" ] }, + "index_mode": true, "updated_at": "2021-04-15T01:30:15.01Z" } \ No newline at end of file diff --git a/pkg/timestamp/range.go b/pkg/timestamp/range.go index b0b92afd..f8db1652 100644 --- a/pkg/timestamp/range.go +++ b/pkg/timestamp/range.go @@ -60,6 +60,22 @@ func (t TimeRange) Overlapping(other TimeRange) bool { return !t.Start.After(other.End) && !other.Start.After(t.End) } +// Include returns whether the TimeRange includes the other TimeRange. +func (t TimeRange) Include(other TimeRange) bool { + var start, end bool + if t.Start.Equal(other.Start) { + start = t.IncludeStart || !other.IncludeStart + } else { + start = !t.Start.After(other.Start) + } + if t.End.Equal(other.End) { + end = t.IncludeEnd || !other.IncludeEnd + } else { + end = !t.End.Before(other.End) + } + return start && end +} + // Duration converts TimeRange to time.Duration. func (t TimeRange) Duration() time.Duration { return t.End.Sub(t.Start) diff --git a/test/cases/measure/data/input/index_mode_all.yaml b/test/cases/measure/data/input/index_mode_all.yaml new file mode 100644 index 00000000..d0c01450 --- /dev/null +++ b/test/cases/measure/data/input/index_mode_all.yaml @@ -0,0 +1,23 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: "service_traffic" +groups: [ "sw_metric" ] +tagProjection: + tagFamilies: + - name: "default" + tags: [ "id", "service_id", "name", "short_name", "service_group", "layer" ] diff --git a/test/cases/measure/data/input/index_mode_order_desc.yaml b/test/cases/measure/data/input/index_mode_order_desc.yaml new file mode 100644 index 00000000..0dd0063d --- /dev/null +++ b/test/cases/measure/data/input/index_mode_order_desc.yaml @@ -0,0 +1,26 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: "service_traffic" +groups: [ "sw_metric" ] +tagProjection: + tagFamilies: + - name: "default" + tags: [ "id", "service_id", "name", "layer" ] +orderBy: + sort: "SORT_DESC" + indexRuleName: "layer" diff --git a/test/cases/measure/data/input/index_mode_range.yaml b/test/cases/measure/data/input/index_mode_range.yaml new file mode 100644 index 00000000..c881b11c --- /dev/null +++ b/test/cases/measure/data/input/index_mode_range.yaml @@ -0,0 +1,30 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: "service_traffic" +groups: [ "sw_metric" ] +tagProjection: + tagFamilies: + - name: "default" + tags: [ "id", "service_id", "name", "layer" ] +criteria: + condition: + name: "layer" + op: "BINARY_OP_GT" + value: + int: + value: "1" diff --git a/test/cases/measure/data/want/index_mode_all.yaml b/test/cases/measure/data/want/index_mode_all.yaml new file mode 100644 index 00000000..3d3edeb2 --- /dev/null +++ b/test/cases/measure/data/want/index_mode_all.yaml @@ -0,0 +1,107 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +dataPoints: +- sid: "15142466043926325685" + tagFamilies: + - name: default + tags: + - key: id + value: + str: + value: "1" + - key: service_id + value: + str: + value: service_1 + - key: name + value: + str: + value: service_name_1 + - key: short_name + value: + str: + value: service_short_name_1 + - key: service_group + value: + str: + value: group1 + - key: layer + value: + int: + value: "1" + timestamp: "2024-11-15T01:02:00Z" + version: "1" +- sid: "3906119849472468294" + tagFamilies: + - name: default + tags: + - key: id + value: + str: + value: "2" + - key: service_id + value: + str: + value: service_2 + - key: name + value: + str: + value: service_name_2 + - key: short_name + value: + str: + value: service_short_name_2 + - key: service_group + value: + str: + value: group1 + - key: layer + value: + int: + value: "2" + timestamp: "2024-11-15T01:03:00Z" + version: "1" +- sid: "12370392692163567533" + tagFamilies: + - name: default + tags: + - key: id + value: + str: + value: "3" + - key: service_id + value: + str: + value: service_3 + - key: name + value: + str: + value: service_name_3 + - key: short_name + value: + str: + value: service_short_name_3 + - key: service_group + value: + str: + value: group1 + - key: layer + value: + int: + value: "1" + timestamp: "2024-11-15T01:04:00Z" + version: "1" diff --git a/test/cases/measure/data/want/index_mode_order_desc.yaml b/test/cases/measure/data/want/index_mode_order_desc.yaml new file mode 100644 index 00000000..470fba1c --- /dev/null +++ b/test/cases/measure/data/want/index_mode_order_desc.yaml @@ -0,0 +1,83 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +dataPoints: +- sid: "3906119849472468294" + tagFamilies: + - name: default + tags: + - key: id + value: + str: + value: "2" + - key: service_id + value: + str: + value: service_2 + - key: name + value: + str: + value: service_name_2 + - key: layer + value: + int: + value: "2" + timestamp: "2024-11-15T01:40:00Z" + version: "1" +- sid: "15142466043926325685" + tagFamilies: + - name: default + tags: + - key: id + value: + str: + value: "1" + - key: service_id + value: + str: + value: service_1 + - key: name + value: + str: + value: service_name_1 + - key: layer + value: + int: + value: "1" + timestamp: "2024-11-15T01:39:00Z" + version: "1" +- sid: "12370392692163567533" + tagFamilies: + - name: default + tags: + - key: id + value: + str: + value: "3" + - key: service_id + value: + str: + value: service_3 + - key: name + value: + str: + value: service_name_3 + - key: layer + value: + int: + value: "1" + timestamp: "2024-11-15T01:41:00Z" + version: "1" diff --git a/test/cases/measure/data/want/index_mode_range.yaml b/test/cases/measure/data/want/index_mode_range.yaml new file mode 100644 index 00000000..f9c67226 --- /dev/null +++ b/test/cases/measure/data/want/index_mode_range.yaml @@ -0,0 +1,39 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +dataPoints: +- sid: "3906119849472468294" + tagFamilies: + - name: default + tags: + - key: id + value: + str: + value: "2" + - key: service_id + value: + str: + value: service_2 + - key: name + value: + str: + value: service_name_2 + - key: layer + value: + int: + value: "2" + timestamp: "2024-11-15T01:43:00Z" + version: "1" diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go index 8f843105..bc31747d 100644 --- a/test/cases/measure/measure.go +++ b/test/cases/measure/measure.go @@ -72,4 +72,7 @@ var _ = g.DescribeTable("Scanning Measures", verify, g.Entry("all_latency", helpers.Args{Input: "all_latency", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), g.Entry("duplicated in a part", helpers.Args{Input: "duplicated_part", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), g.Entry("match a tag belongs to the entity", helpers.Args{Input: "entity_match", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), + g.Entry("all of index mode", helpers.Args{Input: "index_mode_all", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), + g.Entry("order by desc of index mode", helpers.Args{Input: "index_mode_order_desc", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), + g.Entry("range of index mode", helpers.Args{Input: "index_mode_range", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), )
