This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch series-index in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit c8b29672d5829e9f9842f7b14e429cd42861590d Author: Gao Hongtao <[email protected]> AuthorDate: Mon Jun 26 08:15:45 2023 +0000 Leve up the index Signed-off-by: Gao Hongtao <[email protected]> --- CHANGES.md | 1 + banyand/measure/measure_write.go | 21 +- banyand/measure/service.go | 3 + banyand/observability/meter_prom.go | 23 ++- banyand/stream/service.go | 1 + banyand/stream/stream_write.go | 5 +- banyand/tsdb/block.go | 24 ++- banyand/tsdb/buffer.go | 3 + banyand/tsdb/index/writer.go | 15 +- banyand/tsdb/scope.go | 13 ++ banyand/tsdb/series_seek.go | 1 - banyand/tsdb/series_seek_sort.go | 5 +- banyand/tsdb/series_write.go | 23 +++ banyand/tsdb/seriesdb.go | 200 +++++++++++++++++-- banyand/tsdb/tsdb.go | 10 + pkg/query/logical/index_filter.go | 6 + .../measure/measure_plan_indexscan_local.go | 34 ++-- test/cases/measure/data/input/bottom.yaml | 4 +- test/cases/measure/data/input/group_max.yaml | 4 +- test/cases/measure/data/input/group_no_field.yaml | 4 +- test/cases/measure/data/input/in.yaml | 4 +- test/cases/measure/data/input/linked_or.yaml | 10 +- test/cases/measure/data/input/tag_filter.yaml | 2 +- test/cases/measure/data/input/top.yaml | 4 +- .../data/testdata/service_cpm_minute_data.json | 20 +- .../data/testdata/service_cpm_minute_data1.json | 44 ++-- .../data/testdata/service_cpm_minute_data2.json | 62 ------ test/cases/measure/data/want/all.yaml | 158 ++------------- test/cases/measure/data/want/bottom.yaml | 12 +- test/cases/measure/data/want/group_max.yaml | 18 +- test/cases/measure/data/want/group_no_field.yaml | 45 +++-- test/cases/measure/data/want/in.yaml | 18 +- test/cases/measure/data/want/limit.yaml | 14 +- test/cases/measure/data/want/linked_or.yaml | 18 +- test/cases/measure/data/want/order_asc.yaml | 122 +---------- test/cases/measure/data/want/order_desc.yaml | 122 +---------- test/cases/measure/data/want/tag_filter.yaml | 63 ++++-- test/cases/measure/data/want/top.yaml | 12 +- test/cases/measure/data/want/update.yaml | 222 +++++++++++---------- test/integration/cold_query/query_suite_test.go | 2 - test/integration/other/measure_test.go | 7 +- test/integration/other/tls_test.go | 7 +- test/integration/query/query_suite_test.go | 2 - test/stress/cases/istio/report.md | 67 ++++--- 44 files changed, 683 insertions(+), 772 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index c510cb25..b5a83500 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -23,6 +23,7 @@ Release Notes. - Add the Istio scenario stress test based on the data generated by the integration access log. - Generalize the index's docID to uint64. - Remove redundant ID tag type. +- Improve granularity of index in `measure` by leveling up from data point to series. ### Bugs diff --git a/banyand/measure/measure_write.go b/banyand/measure/measure_write.go index 605bea42..2b5f561a 100644 --- a/banyand/measure/measure_write.go +++ b/banyand/measure/measure_write.go @@ -56,7 +56,8 @@ func (s *measure) write(shardID common.ShardID, entity []byte, entityValues tsdb if err != nil { return err } - series, err := shard.Series().Get(entity, entityValues) + seriesDB := shard.Series() + series, err := seriesDB.Get(entity, entityValues) if err != nil { return err } @@ -69,18 +70,18 @@ func (s *measure) write(shardID common.ShardID, entity []byte, entityValues tsdb } return err } - writeFn := func() (tsdb.Writer, error) { + writeFn := func() error { builder := wp.WriterBuilder().Time(t) for fi, family := range value.GetTagFamilies() { spec := s.schema.GetTagFamilies()[fi] bb, errMarshal := pbv1.EncodeFamily(spec, family) if errMarshal != nil { - return nil, errMarshal + return errMarshal } builder.Family(familyIdentity(spec.GetName(), pbv1.TagFlag), bb) } if len(value.GetFields()) > len(s.schema.GetFields()) { - return nil, errors.Wrap(errMalformedElement, "fields number is more than expected") + return errors.Wrap(errMalformedElement, "fields number is more than expected") } for fi, fieldValue := range value.GetFields() { fieldSpec := s.schema.GetFields()[fi] @@ -90,7 +91,7 @@ func (s *measure) write(shardID common.ShardID, entity []byte, entityValues tsdb continue } if fType != fieldSpec.GetFieldType() { - return nil, errors.Wrapf(errMalformedElement, "field %s type is unexpected", fieldSpec.GetName()) + return errors.Wrapf(errMalformedElement, "field %s type is unexpected", fieldSpec.GetName()) } data := encodeFieldValue(fieldValue) if data == nil { @@ -101,7 +102,7 @@ func (s *measure) write(shardID common.ShardID, entity []byte, entityValues tsdb } writer, errWrite := builder.Build() if errWrite != nil { - return nil, errWrite + return errWrite } _, errWrite = writer.Write() if s.l.Debug().Enabled() { @@ -114,15 +115,15 @@ func (s *measure) write(shardID common.ShardID, entity []byte, entityValues tsdb Int("shard_id", int(shardID)). Msg("write measure") } - return writer, errWrite + return errWrite } - writer, err := writeFn() - if err != nil { + + if err = writeFn(); err != nil { _ = wp.Close() return err } m := index.Message{ - LocalWriter: writer, + IndexWriter: tsdb.NewSeriesIndexWriter(series.ID(), seriesDB), Value: index.Value{ TagFamilies: value.GetTagFamilies(), Timestamp: t, diff --git a/banyand/measure/service.go b/banyand/measure/service.go index cec60cf7..04c1b9ce 100644 --- a/banyand/measure/service.go +++ b/banyand/measure/service.go @@ -202,5 +202,8 @@ func NewService(_ context.Context, metadata metadata.Repo, repo discovery.Servic repo: repo, pipeline: pipeline, stopCh: make(chan struct{}), + dbOpts: tsdb.DatabaseOpts{ + IndexGranularity: tsdb.IndexGranularitySeries, + }, }, nil } diff --git a/banyand/observability/meter_prom.go b/banyand/observability/meter_prom.go index e148f09a..bd861b4e 100644 --- a/banyand/observability/meter_prom.go +++ b/banyand/observability/meter_prom.go @@ -21,6 +21,8 @@ package observability import ( + "sync" + grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -30,7 +32,12 @@ import ( "google.golang.org/grpc" ) -var reg = prometheus.NewRegistry() +var ( + reg = prometheus.NewRegistry() + + once = sync.Once{} + srvMetrics *grpcprom.ServerMetrics +) func init() { reg.MustRegister(prometheus.NewGoCollector()) @@ -48,11 +55,13 @@ func NewMeterProvider(scope meter.Scope) meter.Provider { // MetricsServerInterceptor returns a server interceptor for metrics. func MetricsServerInterceptor() (grpc.UnaryServerInterceptor, grpc.StreamServerInterceptor) { - srvMetrics := grpcprom.NewServerMetrics( - grpcprom.WithServerHandlingTimeHistogram( - grpcprom.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}), - ), - ) - reg.MustRegister(srvMetrics) + once.Do(func() { + srvMetrics = grpcprom.NewServerMetrics( + grpcprom.WithServerHandlingTimeHistogram( + grpcprom.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}), + ), + ) + reg.MustRegister(srvMetrics) + }) return srvMetrics.UnaryServerInterceptor(), srvMetrics.StreamServerInterceptor() } diff --git a/banyand/stream/service.go b/banyand/stream/service.go index 8afb9436..d395e9ca 100644 --- a/banyand/stream/service.go +++ b/banyand/stream/service.go @@ -162,6 +162,7 @@ func NewService(_ context.Context, metadata metadata.Repo, repo discovery.Servic pipeline: pipeline, dbOpts: tsdb.DatabaseOpts{ EnableGlobalIndex: true, + IndexGranularity: tsdb.IndexGranularityBlock, }, stopCh: make(chan struct{}), }, nil diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go index 78b086df..8c47dc7e 100644 --- a/banyand/stream/stream_write.go +++ b/banyand/stream/stream_write.go @@ -120,8 +120,9 @@ func (s *stream) write(shardID common.ShardID, entity []byte, entityValues tsdb. return err } m := index.Message{ - Scope: tsdb.Entry(s.name), - LocalWriter: writer, + Scope: tsdb.Entry(s.name), + IndexWriter: writer, + GlobalItemID: writer.ItemID(), Value: index.Value{ TagFamilies: value.GetTagFamilies(), Timestamp: t, diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go index 4a10ceab..4f67416b 100644 --- a/banyand/tsdb/block.go +++ b/banyand/tsdb/block.go @@ -108,7 +108,7 @@ type block struct { type openOpts struct { tsTableFactory TSTableFactory - inverted inverted.StoreOpts + inverted *inverted.StoreOpts lsm lsm.StoreOpts } @@ -163,10 +163,12 @@ func options(ctx context.Context, root string, l *logger.Logger) (openOpts, erro } options = o.(DatabaseOpts) var opts openOpts - opts.inverted = inverted.StoreOpts{ - Path: path.Join(root, componentSecondInvertedIdx), - Logger: l.Named(componentSecondInvertedIdx), - BatchWaitSec: options.BlockInvertedIndex.BatchWaitSec, + if options.IndexGranularity == IndexGranularityBlock { + opts.inverted = &inverted.StoreOpts{ + Path: path.Join(root, componentSecondInvertedIdx), + Logger: l.Named(componentSecondInvertedIdx), + BatchWaitSec: options.BlockInvertedIndex.BatchWaitSec, + } } opts.lsm = lsm.StoreOpts{ Path: path.Join(root, componentSecondLSMIdx), @@ -198,13 +200,16 @@ func (b *block) open() (err error) { return err } b.closableLst = append(b.closableLst, b.tsTable) - if b.invertedIndex, err = inverted.NewStore(b.openOpts.inverted); err != nil { - return err + if b.openOpts.inverted != nil { + if b.invertedIndex, err = inverted.NewStore(*b.openOpts.inverted); err != nil { + return err + } + b.closableLst = append(b.closableLst, b.invertedIndex) } if b.lsmIndex, err = lsm.NewStore(b.openOpts.lsm); err != nil { return err } - b.closableLst = append(b.closableLst, b.invertedIndex, b.lsmIndex) + b.closableLst = append(b.closableLst, b.lsmIndex) b.ref.Store(0) b.closed.Store(false) blockOpenedTimeSecondsGauge.Set(float64(time.Now().Unix()), b.position.LabelValues()...) @@ -431,6 +436,9 @@ func (d *bDelegate) writeLSMIndex(fields []index.Field, id common.ItemID) error } func (d *bDelegate) writeInvertedIndex(fields []index.Field, id common.ItemID) error { + if d.delegate.invertedIndex == nil { + return errors.New("inverted index is not enabled") + } total := 0 for _, f := range fields { total += len(f.Marshal()) diff --git a/banyand/tsdb/buffer.go b/banyand/tsdb/buffer.go index 49ebdf64..c586659b 100644 --- a/banyand/tsdb/buffer.go +++ b/banyand/tsdb/buffer.go @@ -161,6 +161,9 @@ func (b *Buffer) Read(key []byte, ts time.Time) ([]byte, bool) { // Close gracefully closes the Buffer and ensures that all pending operations are completed. func (b *Buffer) Close() error { + if b == nil { + return nil + } b.closerOnce.Do(func() { b.entryCloser.Done() b.entryCloser.CloseThenWait() diff --git a/banyand/tsdb/index/writer.go b/banyand/tsdb/index/writer.go index 01afa143..2bdd9bbf 100644 --- a/banyand/tsdb/index/writer.go +++ b/banyand/tsdb/index/writer.go @@ -38,10 +38,11 @@ import ( // Message wraps value and other info to generate relative indices. type Message struct { - Value Value - LocalWriter tsdb.Writer - BlockCloser io.Closer - Scope tsdb.Entry + IndexWriter tsdb.IndexWriter + BlockCloser io.Closer + Value Value + Scope tsdb.Entry + GlobalItemID tsdb.GlobalItemID } // Value represents the input data for generating indices. @@ -122,8 +123,8 @@ func NewWriter(ctx context.Context, options WriterOptions) *Writer { func (s *Writer) Write(m Message) { err := multierr.Combine( - s.writeLocalIndex(m.LocalWriter, m.Value), - s.writeGlobalIndex(m.Scope, m.LocalWriter.ItemID(), m.Value), + s.writeLocalIndex(m.IndexWriter, m.Value), + s.writeGlobalIndex(m.Scope, m.GlobalItemID, m.Value), m.BlockCloser.Close(), ) if err != nil { @@ -191,7 +192,7 @@ func (s *Writer) writeGlobalIndex(scope tsdb.Entry, ref tsdb.GlobalItemID, value ) } -func (s *Writer) writeLocalIndex(writer tsdb.Writer, value Value) (err error) { +func (s *Writer) writeLocalIndex(writer tsdb.IndexWriter, value Value) (err error) { collect := func(ruleIndexes []*partition.IndexRuleLocator, fn func(fields []index.Field) error) error { fields := make([]index.Field, 0) for _, ruleIndex := range ruleIndexes { diff --git a/banyand/tsdb/scope.go b/banyand/tsdb/scope.go index d28de9eb..5a157e6b 100644 --- a/banyand/tsdb/scope.go +++ b/banyand/tsdb/scope.go @@ -21,6 +21,7 @@ import ( "context" "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/pkg/index" ) var _ Shard = (*scopedShard)(nil) @@ -73,6 +74,18 @@ type scopedSeriesDatabase struct { scope Entry } +func (sdd *scopedSeriesDatabase) writeInvertedIndex(fields []index.Field, seriesID common.SeriesID) error { + return sdd.delegated.writeInvertedIndex(fields, seriesID) +} + +func (sdd *scopedSeriesDatabase) writeLSMIndex(fields []index.Field, seriesID common.SeriesID) error { + return sdd.delegated.writeLSMIndex(fields, seriesID) +} + +func (sdd *scopedSeriesDatabase) Search(ctx context.Context, path Path, filter index.Filter, order *OrderBy) (SeriesList, error) { + return sdd.delegated.Search(ctx, path.prepend(sdd.scope), filter, order) +} + func (sdd *scopedSeriesDatabase) Close() error { return nil } diff --git a/banyand/tsdb/series_seek.go b/banyand/tsdb/series_seek.go index 2534c15e..a802df32 100644 --- a/banyand/tsdb/series_seek.go +++ b/banyand/tsdb/series_seek.go @@ -62,7 +62,6 @@ type seekerBuilder struct { seriesSpan *seriesSpan indexRuleForSorting *databasev1.IndexRule l *logger.Logger - rangeOptsForSorting index.RangeOpts order modelv1.Sort } diff --git a/banyand/tsdb/series_seek_sort.go b/banyand/tsdb/series_seek_sort.go index 63f85ddc..4ea8a2c3 100644 --- a/banyand/tsdb/series_seek_sort.go +++ b/banyand/tsdb/series_seek_sort.go @@ -37,6 +37,7 @@ import ( var ( errUnspecifiedIndexType = errors.New("Unspecified index type") emptyFilters = make([]filterFn, 0) + rangeOpts = index.RangeOpts{} ) func (s *seekerBuilder) OrderByIndex(indexRule *databasev1.IndexRule, order modelv1.Sort) SeekerBuilder { @@ -84,9 +85,9 @@ func (s *seekerBuilder) buildSeriesByIndex() (series []Iterator, err error) { } switch s.indexRuleForSorting.GetType() { case databasev1.IndexRule_TYPE_TREE: - inner, err = b.lsmIndexReader().Iterator(fieldKey, s.rangeOptsForSorting, s.order) + inner, err = b.lsmIndexReader().Iterator(fieldKey, rangeOpts, s.order) case databasev1.IndexRule_TYPE_INVERTED: - inner, err = b.invertedIndexReader().Iterator(fieldKey, s.rangeOptsForSorting, s.order) + inner, err = b.invertedIndexReader().Iterator(fieldKey, rangeOpts, s.order) case databasev1.IndexRule_TYPE_UNSPECIFIED: return nil, errors.WithMessagef(errUnspecifiedIndexType, "index rule:%v", s.indexRuleForSorting) } diff --git a/banyand/tsdb/series_write.go b/banyand/tsdb/series_write.go index f9035666..501689be 100644 --- a/banyand/tsdb/series_write.go +++ b/banyand/tsdb/series_write.go @@ -207,3 +207,26 @@ func (w *writer) Write() (GlobalItemID, error) { Term: convert.Int64ToBytes(w.ts.UnixNano()), }, id.ID) } + +var _ IndexWriter = (*seriesIndexWriter)(nil) + +type seriesIndexWriter struct { + seriesDB SeriesDatabase + seriesID common.SeriesID +} + +func (s *seriesIndexWriter) WriteInvertedIndex(fields []index.Field) error { + return s.seriesDB.writeInvertedIndex(fields, s.seriesID) +} + +func (s *seriesIndexWriter) WriteLSMIndex(fields []index.Field) error { + return s.seriesDB.writeLSMIndex(fields, s.seriesID) +} + +// NewSeriesIndexWriter returns a new series index writer. +func NewSeriesIndexWriter(seriesID common.SeriesID, seriesDB SeriesDatabase) IndexWriter { + return &seriesIndexWriter{ + seriesID: seriesID, + seriesDB: seriesDB, + } +} diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go index 910f2b0d..4b8ded90 100644 --- a/banyand/tsdb/seriesdb.go +++ b/banyand/tsdb/seriesdb.go @@ -23,6 +23,7 @@ import ( "errors" "io" "math" + "path" "sort" "strings" "sync" @@ -32,9 +33,15 @@ import ( "google.golang.org/protobuf/proto" "github.com/apache/skywalking-banyandb/api/common" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/kv" "github.com/apache/skywalking-banyandb/pkg/convert" + "github.com/apache/skywalking-banyandb/pkg/index" + "github.com/apache/skywalking-banyandb/pkg/index/inverted" + "github.com/apache/skywalking-banyandb/pkg/index/lsm" + "github.com/apache/skywalking-banyandb/pkg/index/posting" + "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring" "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/timestamp" @@ -245,13 +252,22 @@ func prepend(src []byte, entry []byte) []byte { return dst } +// OrderBy specifies the order of the result. +type OrderBy struct { + Index *databasev1.IndexRule + Sort modelv1.Sort +} + // SeriesDatabase allows retrieving series. type SeriesDatabase interface { io.Closer GetByID(id common.SeriesID) (Series, error) Get(key []byte, entityValues EntityValues) (Series, error) List(ctx context.Context, path Path) (SeriesList, error) + Search(ctx context.Context, path Path, filter index.Filter, order *OrderBy) (SeriesList, error) SizeOnDisk() int64 + writeInvertedIndex(fields []index.Field, seriesID common.SeriesID) error + writeLSMIndex(fields []index.Field, seriesID common.SeriesID) error } type blockDatabase interface { @@ -270,6 +286,8 @@ var ( type seriesDB struct { seriesMetadata kv.Store + invertedIndex index.Store + lsmIndex index.Store l *logger.Logger segCtrl *segmentController position common.Position @@ -277,6 +295,21 @@ type seriesDB struct { sID common.ShardID } +func (s *seriesDB) writeInvertedIndex(fields []index.Field, seriesID common.SeriesID) error { + if s.invertedIndex == nil { + return errors.New("inverted index is not enabled") + } + return s.invertedIndex.Write(fields, uint64(seriesID)) +} + +func (s *seriesDB) writeLSMIndex(fields []index.Field, seriesID common.SeriesID) error { + if s.lsmIndex == nil { + return errors.New("lsm index is not enabled") + } + return s.lsmIndex.Write(fields, uint64(seriesID)) +} + +// nolint: contextcheck func (s *seriesDB) GetByID(id common.SeriesID) (Series, error) { var series string if e := s.l.Debug(); e.Enabled() { @@ -404,7 +437,8 @@ func (s *seriesDB) List(ctx context.Context, path Path) (SeriesList, error) { Uint64("series_id", uint64(seriesID)). Msg("got a series with a full path") } - return []Series{newSeries(ctx, seriesID, series, s)}, nil + // nolint: contextcheck + return []Series{newSeries(s.context(), seriesID, series, s)}, nil } result := make([]Series, 0) var err error @@ -446,6 +480,123 @@ func (s *seriesDB) List(ctx context.Context, path Path) (SeriesList, error) { return result, err } +func (s *seriesDB) Search(ctx context.Context, path Path, filter index.Filter, order *OrderBy) (SeriesList, error) { + if s.invertedIndex == nil || s.lsmIndex == nil { + return nil, errors.New("search is not supported") + } + if path.isFull { + return s.List(ctx, path) + } + if order == nil { + return s.filterSeries(ctx, path, filter) + } + fieldKey := index.FieldKey{ + IndexRuleID: order.Index.GetMetadata().Id, + } + var iter index.FieldIterator + var err error + switch order.Index.Type { + case databasev1.IndexRule_TYPE_TREE: + iter, err = s.lsmIndex.Iterator(fieldKey, rangeOpts, order.Sort) + case databasev1.IndexRule_TYPE_INVERTED: + iter, err = s.invertedIndex.Iterator(fieldKey, rangeOpts, order.Sort) + default: + return nil, errUnspecifiedIndexType + } + if err != nil { + return nil, err + } + defer func() { + err = multierr.Append(err, iter.Close()) + }() + var pl posting.List + if pl, err = s.seriesFilter(ctx, path, filter); err != nil { + return nil, err + } + seriesList := make([]Series, 0) + for iter.Next() { + pv := iter.Val().Value + if err = pv.Intersect(pl); err != nil { + return nil, err + } + if pv.IsEmpty() { + continue + } + pIter := pv.Iterator() + for iter.Next() { + var series Series + if series, err = s.GetByID(common.SeriesID(pIter.Current())); err != nil { + return nil, multierr.Append(err, pIter.Close()) + } + seriesList = append(seriesList, series) + } + if err = pIter.Close(); err != nil { + return nil, err + } + } + return seriesList, err +} + +func (s *seriesDB) filterSeries(ctx context.Context, path Path, filter index.Filter) (SeriesList, error) { + var seriesList SeriesList + var err error + if filter == nil { + return s.List(ctx, path) + } + var pl posting.List + if pl, err = filter.Execute(func(ruleType databasev1.IndexRule_Type) (index.Searcher, error) { + switch ruleType { + case databasev1.IndexRule_TYPE_TREE: + return s.lsmIndex, nil + case databasev1.IndexRule_TYPE_INVERTED: + return s.invertedIndex, nil + default: + return nil, errUnspecifiedIndexType + } + }, 0); err != nil { + return nil, err + } + + if len(path.seekKey) == 0 { + iter := pl.Iterator() + defer func() { + err = multierr.Append(err, iter.Close()) + }() + for iter.Next() { + var series Series + if series, err = s.GetByID(common.SeriesID(iter.Current())); err != nil { + return nil, err + } + seriesList = append(seriesList, series) + } + return seriesList, err + } + if seriesList, err = s.List(ctx, path); err != nil { + return nil, err + } + // Remove a series from seriesList if its ID is not in the pl + for i := 0; i < len(seriesList); i++ { + if !pl.Contains(uint64(seriesList[i].ID())) { + seriesList = append(seriesList[:i], seriesList[i+1:]...) + i-- + } + } + return seriesList, nil +} + +func (s *seriesDB) seriesFilter(ctx context.Context, path Path, filter index.Filter) (posting.List, error) { + var sl SeriesList + var err error + if sl, err = s.filterSeries(ctx, path, filter); err != nil { + return nil, err + } + pl := roaring.NewPostingList() + for _, series := range sl { + pl.Insert(uint64(series.ID())) + } + return pl, nil +} + func (s *seriesDB) span(ctx context.Context, timeRange timestamp.TimeRange) ([]blockDelegate, error) { result := make([]blockDelegate, 0) for _, s := range s.segCtrl.span(timeRange) { @@ -497,10 +648,17 @@ func (s *seriesDB) context() context.Context { } func (s *seriesDB) Close() error { - return s.seriesMetadata.Close() + err := s.seriesMetadata.Close() + if s.invertedIndex != nil { + err = multierr.Append(err, s.invertedIndex.Close()) + } + if s.lsmIndex != nil { + err = multierr.Append(err, s.lsmIndex.Close()) + } + return err } -func newSeriesDataBase(ctx context.Context, shardID common.ShardID, path string, segCtrl *segmentController) (SeriesDatabase, error) { +func newSeriesDataBase(ctx context.Context, shardID common.ShardID, root string, segCtrl *segmentController) (SeriesDatabase, error) { sdb := &seriesDB{ sID: shardID, segCtrl: segCtrl, @@ -508,25 +666,41 @@ func newSeriesDataBase(ctx context.Context, shardID common.ShardID, path string, position: common.GetPosition(ctx), } o := ctx.Value(OptionsKey) + var options DatabaseOpts + if o == nil { + options = DatabaseOpts{} + } else { + options = o.(DatabaseOpts) + } var memSize int64 - if o != nil { - options := o.(DatabaseOpts) - if options.SeriesMemSize > 1 { - memSize = int64(options.SeriesMemSize) - } else { - memSize = defaultKVMemorySize - } + if options.SeriesMemSize > 1 { + memSize = int64(options.SeriesMemSize) } else { memSize = defaultKVMemorySize } var err error - sdb.seriesMetadata, err = kv.OpenStore(path+"/md", + if sdb.seriesMetadata, err = kv.OpenStore(root+"/md", kv.StoreWithNamedLogger("metadata", sdb.l), kv.StoreWithMemTableSize(memSize), - ) - if err != nil { + ); err != nil { return nil, err } + if options.IndexGranularity == IndexGranularitySeries { + if sdb.invertedIndex, err = inverted.NewStore(inverted.StoreOpts{ + Path: path.Join(root, componentSecondInvertedIdx), + Logger: sdb.l.Named(componentSecondInvertedIdx), + BatchWaitSec: options.BlockInvertedIndex.BatchWaitSec, + }); err != nil { + return nil, err + } + if sdb.lsmIndex, err = lsm.NewStore(lsm.StoreOpts{ + Path: path.Join(root, componentSecondLSMIdx), + Logger: sdb.l.Named(componentSecondLSMIdx), + MemTableSize: defaultKVMemorySize, + }); err != nil { + return nil, err + } + } return sdb, nil } diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go index c3035d39..c47f2d72 100644 --- a/banyand/tsdb/tsdb.go +++ b/banyand/tsdb/tsdb.go @@ -100,6 +100,15 @@ type Shard interface { TriggerSchedule(task string) bool } +// IndexGranularity denotes the granularity of the local index. +type IndexGranularity int + +// The options of the local index granularity. +const ( + IndexGranularityBlock IndexGranularity = iota + IndexGranularitySeries +) + var _ Database = (*database)(nil) // DatabaseOpts wraps options to create a tsdb. @@ -114,6 +123,7 @@ type DatabaseOpts struct { GlobalIndexMemSize run.Bytes ShardNum uint32 EnableGlobalIndex bool + IndexGranularity IndexGranularity } // InvertedIndexOpts wraps options to create the block inverted index. diff --git a/pkg/query/logical/index_filter.go b/pkg/query/logical/index_filter.go index 85bf74ea..0d16d180 100644 --- a/pkg/query/logical/index_filter.go +++ b/pkg/query/logical/index_filter.go @@ -81,6 +81,12 @@ func BuildLocalFilter(criteria *modelv1.Criteria, schema Schema, entityDict map[ if le.GetLeft() == nil && le.GetRight() == nil { return nil, nil, errors.WithMessagef(errInvalidLogicalExpression, "both sides(left and right) of [%v] are empty", criteria) } + if le.GetLeft() == nil { + return BuildLocalFilter(le.Right, schema, entityDict, entity) + } + if le.GetRight() == nil { + return BuildLocalFilter(le.Left, schema, entityDict, entity) + } left, leftEntities, err := BuildLocalFilter(le.Left, schema, entityDict, entity) if err != nil { return nil, nil, err diff --git a/pkg/query/logical/measure/measure_plan_indexscan_local.go b/pkg/query/logical/measure/measure_plan_indexscan_local.go index af2ea278..1b45fdfd 100644 --- a/pkg/query/logical/measure/measure_plan_indexscan_local.go +++ b/pkg/query/logical/measure/measure_plan_indexscan_local.go @@ -123,6 +123,13 @@ func (i *localIndexScan) Sort(order *logical.OrderBy) { } func (i *localIndexScan) Execute(ec executor.MeasureExecutionContext) (mit executor.MIterator, err error) { + var orderBy *tsdb.OrderBy + if i.order.Index != nil { + orderBy = &tsdb.OrderBy{ + Index: i.order.Index, + Sort: i.order.Sort, + } + } var seriesList tsdb.SeriesList for _, e := range i.entities { shards, errInternal := ec.Shards(e) @@ -130,11 +137,16 @@ func (i *localIndexScan) Execute(ec executor.MeasureExecutionContext) (mit execu return nil, errInternal } for _, shard := range shards { - sl, errInternal := shard.Series().List(context.WithValue( - context.Background(), - logger.ContextKey, - i.l, - ), tsdb.NewPath(e)) + sl, errInternal := shard.Series().Search( + context.WithValue( + context.Background(), + logger.ContextKey, + i.l, + ), + tsdb.NewPath(e), + i.filter, + orderBy, + ) if errInternal != nil { return nil, errInternal } @@ -145,20 +157,12 @@ func (i *localIndexScan) Execute(ec executor.MeasureExecutionContext) (mit execu return dummyIter, nil } var builders []logical.SeekerBuilder - if i.order.Index != nil { - builders = append(builders, func(builder tsdb.SeekerBuilder) { - builder.OrderByIndex(i.order.Index, i.order.Sort) - }) - } else { + if i.order.Index == nil { builders = append(builders, func(builder tsdb.SeekerBuilder) { builder.OrderByTime(i.order.Sort) }) } - if i.filter != nil { - builders = append(builders, func(b tsdb.SeekerBuilder) { - b.Filter(i.filter) - }) - } + // CAVEAT: the order of series list matters when sorting by an index. iters, closers, err := logical.ExecuteForShard(i.l, seriesList, i.timeRange, builders...) if err != nil { return nil, err diff --git a/test/cases/measure/data/input/bottom.yaml b/test/cases/measure/data/input/bottom.yaml index 4aee3650..0cc0e4c2 100644 --- a/test/cases/measure/data/input/bottom.yaml +++ b/test/cases/measure/data/input/bottom.yaml @@ -21,14 +21,14 @@ metadata: tagProjection: tagFamilies: - name: "default" - tags: ["entity_id"] + tags: ["id"] fieldProjection: names: ["total", "value"] groupBy: tagProjection: tagFamilies: - name: "default" - tags: ["entity_id"] + tags: ["id"] fieldName: "value" agg: function: "AGGREGATION_FUNCTION_MEAN" diff --git a/test/cases/measure/data/input/group_max.yaml b/test/cases/measure/data/input/group_max.yaml index 80e7dd4a..8287303b 100644 --- a/test/cases/measure/data/input/group_max.yaml +++ b/test/cases/measure/data/input/group_max.yaml @@ -21,14 +21,14 @@ metadata: tagProjection: tagFamilies: - name: "default" - tags: ["entity_id"] + tags: ["id"] fieldProjection: names: ["total", "value"] groupBy: tagProjection: tagFamilies: - name: "default" - tags: ["entity_id"] + tags: ["id"] fieldName: "value" agg: function: "AGGREGATION_FUNCTION_MAX" diff --git a/test/cases/measure/data/input/group_no_field.yaml b/test/cases/measure/data/input/group_no_field.yaml index 7ae6d120..b9632f78 100644 --- a/test/cases/measure/data/input/group_no_field.yaml +++ b/test/cases/measure/data/input/group_no_field.yaml @@ -21,10 +21,10 @@ metadata: tagProjection: tagFamilies: - name: "default" - tags: ["entity_id"] + tags: ["id"] groupBy: tagProjection: tagFamilies: - name: "default" - tags: ["entity_id"] + tags: ["id"] diff --git a/test/cases/measure/data/input/in.yaml b/test/cases/measure/data/input/in.yaml index 7dcef0db..7b0fe828 100644 --- a/test/cases/measure/data/input/in.yaml +++ b/test/cases/measure/data/input/in.yaml @@ -26,8 +26,8 @@ fieldProjection: names: ["total", "value"] criteria: condition: - name: "id" + name: "entity_id" op: "BINARY_OP_IN" value: str_array: - value: ["4", "5", "unknown"] + value: ["entity_4", "entity_5", "unknown"] diff --git a/test/cases/measure/data/input/linked_or.yaml b/test/cases/measure/data/input/linked_or.yaml index 6d0b3b4e..bf9d90ad 100644 --- a/test/cases/measure/data/input/linked_or.yaml +++ b/test/cases/measure/data/input/linked_or.yaml @@ -29,27 +29,27 @@ criteria: op: "LOGICAL_OP_OR" left: condition: - name: "id" + name: "entity_id" op: "BINARY_OP_EQ" value: str: - value: "4" + value: "entity_4" right: le: op: "LOGICAL_OP_OR" left: condition: - name: "id" + name: "entity_id" op: "BINARY_OP_EQ" value: str: - value: "5" + value: "entity_5" right: le: op: "LOGICAL_OP_OR" left: condition: - name: "id" + name: "entity_id" op: "BINARY_OP_EQ" value: str: diff --git a/test/cases/measure/data/input/tag_filter.yaml b/test/cases/measure/data/input/tag_filter.yaml index d5a48980..b5481e8f 100644 --- a/test/cases/measure/data/input/tag_filter.yaml +++ b/test/cases/measure/data/input/tag_filter.yaml @@ -30,4 +30,4 @@ criteria: op: "BINARY_OP_EQ" value: str: - value: "1" + value: "svc1" diff --git a/test/cases/measure/data/input/top.yaml b/test/cases/measure/data/input/top.yaml index c103d033..5d66f2dc 100644 --- a/test/cases/measure/data/input/top.yaml +++ b/test/cases/measure/data/input/top.yaml @@ -21,14 +21,14 @@ metadata: tagProjection: tagFamilies: - name: "default" - tags: ["entity_id"] + tags: ["id"] fieldProjection: names: ["value"] groupBy: tagProjection: tagFamilies: - name: "default" - tags: ["entity_id"] + tags: ["id"] fieldName: "value" agg: function: "AGGREGATION_FUNCTION_MEAN" diff --git a/test/cases/measure/data/testdata/service_cpm_minute_data.json b/test/cases/measure/data/testdata/service_cpm_minute_data.json index 82d4203d..5dc3808a 100644 --- a/test/cases/measure/data/testdata/service_cpm_minute_data.json +++ b/test/cases/measure/data/testdata/service_cpm_minute_data.json @@ -5,7 +5,7 @@ "tags": [ { "str": { - "value": "1" + "value": "svc1" } }, { @@ -35,7 +35,7 @@ "tags": [ { "str": { - "value": "4" + "value": "svc1" } }, { @@ -65,12 +65,12 @@ "tags": [ { "str": { - "value": "5" + "value": "svc1" } }, { "str": { - "value": "entity_2" + "value": "entity_3" } } ] @@ -95,12 +95,12 @@ "tags": [ { "str": { - "value": "6" + "value": "svc2" } }, { "str": { - "value": "entity_3" + "value": "entity_4" } } ] @@ -125,12 +125,12 @@ "tags": [ { "str": { - "value": "2" + "value": "svc2" } }, { "str": { - "value": "entity_1" + "value": "entity_5" } } ] @@ -155,12 +155,12 @@ "tags": [ { "str": { - "value": "3" + "value": "svc3" } }, { "str": { - "value": "entity_1" + "value": "entity_6" } } ] diff --git a/test/cases/measure/data/testdata/service_cpm_minute_data1.json b/test/cases/measure/data/testdata/service_cpm_minute_data1.json index 7ba3cd2b..e7567d71 100644 --- a/test/cases/measure/data/testdata/service_cpm_minute_data1.json +++ b/test/cases/measure/data/testdata/service_cpm_minute_data1.json @@ -5,12 +5,12 @@ "tags": [ { "str": { - "value": "7" + "value": "svc1" } }, { "str": { - "value": "entity_2" + "value": "entity_1" } } ] @@ -19,12 +19,12 @@ "fields": [ { "int": { - "value": 100 + "value": 200 } }, { "int": { - "value": 10 + "value": 2 } } ] @@ -35,7 +35,7 @@ "tags": [ { "str": { - "value": "8" + "value": "svc1" } }, { @@ -49,12 +49,12 @@ "fields": [ { "int": { - "value": 100 + "value": 200 } }, { "int": { - "value": 9 + "value": 3 } } ] @@ -65,12 +65,12 @@ "tags": [ { "str": { - "value": "9" + "value": "svc1" } }, { "str": { - "value": "entity_2" + "value": "entity_3" } } ] @@ -79,12 +79,12 @@ "fields": [ { "int": { - "value": 100 + "value": 200 } }, { "int": { - "value": 8 + "value": 4 } } ] @@ -95,12 +95,12 @@ "tags": [ { "str": { - "value": "10" + "value": "svc2" } }, { "str": { - "value": "entity_3" + "value": "entity_4" } } ] @@ -109,12 +109,12 @@ "fields": [ { "int": { - "value": 100 + "value": 200 } }, { "int": { - "value": 11 + "value": 6 } } ] @@ -125,12 +125,12 @@ "tags": [ { "str": { - "value": "11" + "value": "svc2" } }, { "str": { - "value": "entity_1" + "value": "entity_5" } } ] @@ -139,12 +139,12 @@ "fields": [ { "int": { - "value": 50 + "value": 60 } }, { "int": { - "value": 12 + "value": 5 } } ] @@ -155,12 +155,12 @@ "tags": [ { "str": { - "value": "12" + "value": "svc3" } }, { "str": { - "value": "entity_1" + "value": "entity_6" } } ] @@ -169,7 +169,7 @@ "fields": [ { "int": { - "value": 300 + "value": 400 } }, { diff --git a/test/cases/measure/data/testdata/service_cpm_minute_data2.json b/test/cases/measure/data/testdata/service_cpm_minute_data2.json deleted file mode 100644 index 94dcf8ec..00000000 --- a/test/cases/measure/data/testdata/service_cpm_minute_data2.json +++ /dev/null @@ -1,62 +0,0 @@ -[ - { - "tag_families": [ - { - "tags": [ - { - "str": { - "value": "100" - } - }, - { - "str": { - "value": "entity_2" - } - } - ] - } - ], - "fields": [ - { - "int": { - "value": 100 - } - }, - { - "int": { - "value": 100 - } - } - ] - }, - { - "tag_families": [ - { - "tags": [ - { - "str": { - "value": "110" - } - }, - { - "str": { - "value": "entity_2" - } - } - ] - } - ], - "fields": [ - { - "int": { - "value": 100 - } - }, - { - "int": { - "value": 110 - } - } - ] - } -] diff --git a/test/cases/measure/data/want/all.yaml b/test/cases/measure/data/want/all.yaml index 1e9612e1..00b18290 100644 --- a/test/cases/measure/data/want/all.yaml +++ b/test/cases/measure/data/want/all.yaml @@ -31,33 +31,12 @@ dataPoints: - key: id value: str: - value: "1" + value: svc1 - key: entity_id value: str: value: entity_1 - timestamp: "2022-10-17T12:49:45.912Z" -- fields: - - name: total - value: - int: - value: "100" - - name: value - value: - int: - value: "10" - tagFamilies: - - name: default - tags: - - key: id - value: - str: - value: "7" - - key: entity_id - value: - str: - value: entity_2 - timestamp: "2022-10-17T12:49:46.912Z" + timestamp: "2023-06-25T23:26:00Z" - fields: - name: total value: @@ -73,33 +52,12 @@ dataPoints: - key: id value: str: - value: "4" + value: svc1 - key: entity_id value: str: value: entity_2 - timestamp: "2022-10-17T12:50:45.912Z" -- fields: - - name: total - value: - int: - value: "100" - - name: value - value: - int: - value: "9" - tagFamilies: - - name: default - tags: - - key: id - value: - str: - value: "8" - - key: entity_id - value: - str: - value: entity_2 - timestamp: "2022-10-17T12:50:46.912Z" + timestamp: "2023-06-25T23:27:00Z" - fields: - name: total value: @@ -115,54 +73,12 @@ dataPoints: - key: id value: str: - value: "5" - - key: entity_id - value: - str: - value: entity_2 - timestamp: "2022-10-17T12:51:45.912Z" -- fields: - - name: total - value: - int: - value: "100" - - name: value - value: - int: - value: "8" - tagFamilies: - - name: default - tags: - - key: id - value: - str: - value: "9" - - key: entity_id - value: - str: - value: entity_2 - timestamp: "2022-10-17T12:51:46.912Z" -- fields: - - name: total - value: - int: - value: "100" - - name: value - value: - int: - value: "5" - tagFamilies: - - name: default - tags: - - key: id - value: - str: - value: "6" + value: svc1 - key: entity_id value: str: value: entity_3 - timestamp: "2022-10-17T12:52:45.912Z" + timestamp: "2023-06-25T23:28:00Z" - fields: - name: total value: @@ -171,19 +87,19 @@ dataPoints: - name: value value: int: - value: "11" + value: "5" tagFamilies: - name: default tags: - key: id value: str: - value: "10" + value: svc2 - key: entity_id value: str: - value: entity_3 - timestamp: "2022-10-17T12:52:46.912Z" + value: entity_4 + timestamp: "2023-06-25T23:29:00Z" - fields: - name: total value: @@ -199,33 +115,12 @@ dataPoints: - key: id value: str: - value: "2" - - key: entity_id - value: - str: - value: entity_1 - timestamp: "2022-10-17T12:53:45.912Z" -- fields: - - name: total - value: - int: - value: "50" - - name: value - value: - int: - value: "12" - tagFamilies: - - name: default - tags: - - key: id - value: - str: - value: "11" + value: svc2 - key: entity_id value: str: - value: entity_1 - timestamp: "2022-10-17T12:53:46.912Z" + value: entity_5 + timestamp: "2023-06-25T23:30:00Z" - fields: - name: total value: @@ -241,30 +136,9 @@ dataPoints: - key: id value: str: - value: "3" - - key: entity_id - value: - str: - value: entity_1 - timestamp: "2022-10-17T12:54:45.912Z" -- fields: - - name: total - value: - int: - value: "300" - - name: value - value: - int: - value: "7" - tagFamilies: - - name: default - tags: - - key: id - value: - str: - value: "12" + value: svc3 - key: entity_id value: str: - value: entity_1 - timestamp: "2022-10-17T12:54:46.912Z" + value: entity_6 + timestamp: "2023-06-25T23:31:00Z" diff --git a/test/cases/measure/data/want/bottom.yaml b/test/cases/measure/data/want/bottom.yaml index 760d5a73..3c86cefe 100644 --- a/test/cases/measure/data/want/bottom.yaml +++ b/test/cases/measure/data/want/bottom.yaml @@ -20,23 +20,23 @@ dataPoints: - name: value value: int: - value: "6" + value: "2" tagFamilies: - name: default tags: - - key: entity_id + - key: id value: str: - value: entity_2 + value: svc1 - fields: - name: value value: int: - value: "6" + value: "4" tagFamilies: - name: default tags: - - key: entity_id + - key: id value: str: - value: entity_1 \ No newline at end of file + value: svc2 diff --git a/test/cases/measure/data/want/group_max.yaml b/test/cases/measure/data/want/group_max.yaml index bbe39234..5bf44e0e 100644 --- a/test/cases/measure/data/want/group_max.yaml +++ b/test/cases/measure/data/want/group_max.yaml @@ -20,35 +20,35 @@ dataPoints: - name: value value: int: - value: "12" + value: "3" tagFamilies: - name: default tags: - - key: entity_id + - key: id value: str: - value: entity_1 + value: svc1 - fields: - name: value value: int: - value: "10" + value: "5" tagFamilies: - name: default tags: - - key: entity_id + - key: id value: str: - value: entity_2 + value: svc2 - fields: - name: value value: int: - value: "11" + value: "6" tagFamilies: - name: default tags: - - key: entity_id + - key: id value: str: - value: entity_3 \ No newline at end of file + value: svc3 \ No newline at end of file diff --git a/test/cases/measure/data/want/group_no_field.yaml b/test/cases/measure/data/want/group_no_field.yaml index b506f875..572750b3 100644 --- a/test/cases/measure/data/want/group_no_field.yaml +++ b/test/cases/measure/data/want/group_no_field.yaml @@ -16,24 +16,27 @@ # under the License. dataPoints: - - tagFamilies: - - name: default - tags: - - key: entity_id - value: - str: - value: entity_1 - - tagFamilies: - - name: default - tags: - - key: entity_id - value: - str: - value: entity_2 - - tagFamilies: - - name: default - tags: - - key: entity_id - value: - str: - value: entity_3 \ No newline at end of file +- tagFamilies: + - name: default + tags: + - key: id + value: + str: + value: svc1 + timestamp: "2023-06-25T23:51:00Z" +- tagFamilies: + - name: default + tags: + - key: id + value: + str: + value: svc2 + timestamp: "2023-06-25T23:54:00Z" +- tagFamilies: + - name: default + tags: + - key: id + value: + str: + value: svc3 + timestamp: "2023-06-25T23:56:00Z" \ No newline at end of file diff --git a/test/cases/measure/data/want/in.yaml b/test/cases/measure/data/want/in.yaml index cc339b74..c4d58638 100644 --- a/test/cases/measure/data/want/in.yaml +++ b/test/cases/measure/data/want/in.yaml @@ -24,37 +24,37 @@ dataPoints: - name: value value: int: - value: "2" + value: "5" tagFamilies: - name: default tags: - key: id value: str: - value: "4" + value: svc2 - key: entity_id value: str: - value: entity_2 - timestamp: "2022-10-17T12:50:45.912Z" + value: entity_4 + timestamp: "2023-06-26T01:38:00Z" - fields: - name: total value: int: - value: "100" + value: "50" - name: value value: int: - value: "3" + value: "4" tagFamilies: - name: default tags: - key: id value: str: - value: "5" + value: svc2 - key: entity_id value: str: - value: entity_2 - timestamp: "2022-10-17T12:51:45.912Z" + value: entity_5 + timestamp: "2023-06-26T01:39:00Z" diff --git a/test/cases/measure/data/want/limit.yaml b/test/cases/measure/data/want/limit.yaml index ca93ea1f..103a8c5d 100644 --- a/test/cases/measure/data/want/limit.yaml +++ b/test/cases/measure/data/want/limit.yaml @@ -24,29 +24,29 @@ dataPoints: - name: value value: int: - value: "9" + value: "5" tagFamilies: - name: default tags: - key: entity_id value: str: - value: entity_2 - timestamp: "2022-10-17T12:57:03.444Z" + value: entity_4 + timestamp: "2023-06-26T01:01:00Z" - fields: - name: total value: int: - value: "100" + value: "50" - name: value value: int: - value: "3" + value: "4" tagFamilies: - name: default tags: - key: entity_id value: str: - value: entity_2 - timestamp: "2022-10-17T12:58:02.444Z" \ No newline at end of file + value: entity_5 + timestamp: "2023-06-26T01:02:00Z" \ No newline at end of file diff --git a/test/cases/measure/data/want/linked_or.yaml b/test/cases/measure/data/want/linked_or.yaml index cc339b74..9de74079 100644 --- a/test/cases/measure/data/want/linked_or.yaml +++ b/test/cases/measure/data/want/linked_or.yaml @@ -24,37 +24,37 @@ dataPoints: - name: value value: int: - value: "2" + value: "5" tagFamilies: - name: default tags: - key: id value: str: - value: "4" + value: svc2 - key: entity_id value: str: - value: entity_2 - timestamp: "2022-10-17T12:50:45.912Z" + value: entity_4 + timestamp: "2023-06-26T01:35:00Z" - fields: - name: total value: int: - value: "100" + value: "50" - name: value value: int: - value: "3" + value: "4" tagFamilies: - name: default tags: - key: id value: str: - value: "5" + value: svc2 - key: entity_id value: str: - value: entity_2 - timestamp: "2022-10-17T12:51:45.912Z" + value: entity_5 + timestamp: "2023-06-26T01:36:00Z" diff --git a/test/cases/measure/data/want/order_asc.yaml b/test/cases/measure/data/want/order_asc.yaml index d55db5e0..0c6e0394 100644 --- a/test/cases/measure/data/want/order_asc.yaml +++ b/test/cases/measure/data/want/order_asc.yaml @@ -32,24 +32,7 @@ dataPoints: value: str: value: entity_1 - timestamp: "2022-10-17T12:54:36.627Z" -- fields: - - name: total - value: - int: - value: "100" - - name: value - value: - int: - value: "10" - tagFamilies: - - name: default - tags: - - key: entity_id - value: - str: - value: entity_2 - timestamp: "2022-10-17T12:54:37.627Z" + timestamp: "2023-06-26T00:31:00Z" - fields: - name: total value: @@ -66,24 +49,7 @@ dataPoints: value: str: value: entity_2 - timestamp: "2022-10-17T12:55:36.627Z" -- fields: - - name: total - value: - int: - value: "100" - - name: value - value: - int: - value: "9" - tagFamilies: - - name: default - tags: - - key: entity_id - value: - str: - value: entity_2 - timestamp: "2022-10-17T12:55:37.627Z" + timestamp: "2023-06-26T00:32:00Z" - fields: - name: total value: @@ -94,47 +60,13 @@ dataPoints: int: value: "3" tagFamilies: - - name: default - tags: - - key: entity_id - value: - str: - value: entity_2 - timestamp: "2022-10-17T12:56:36.627Z" -- fields: - - name: total - value: - int: - value: "100" - - name: value - value: - int: - value: "8" - tagFamilies: - - name: default - tags: - - key: entity_id - value: - str: - value: entity_2 - timestamp: "2022-10-17T12:56:37.627Z" -- fields: - - name: total - value: - int: - value: "100" - - name: value - value: - int: - value: "5" - tagFamilies: - name: default tags: - key: entity_id value: str: value: entity_3 - timestamp: "2022-10-17T12:57:36.627Z" + timestamp: "2023-06-26T00:33:00Z" - fields: - name: total value: @@ -143,15 +75,15 @@ dataPoints: - name: value value: int: - value: "11" + value: "5" tagFamilies: - name: default tags: - key: entity_id value: str: - value: entity_3 - timestamp: "2022-10-17T12:57:37.627Z" + value: entity_4 + timestamp: "2023-06-26T00:34:00Z" - fields: - name: total value: @@ -167,25 +99,8 @@ dataPoints: - key: entity_id value: str: - value: entity_1 - timestamp: "2022-10-17T12:58:36.627Z" -- fields: - - name: total - value: - int: - value: "50" - - name: value - value: - int: - value: "12" - tagFamilies: - - name: default - tags: - - key: entity_id - value: - str: - value: entity_1 - timestamp: "2022-10-17T12:58:37.627Z" + value: entity_5 + timestamp: "2023-06-26T00:35:00Z" - fields: - name: total value: @@ -201,22 +116,5 @@ dataPoints: - key: entity_id value: str: - value: entity_1 - timestamp: "2022-10-17T12:59:36.627Z" -- fields: - - name: total - value: - int: - value: "300" - - name: value - value: - int: - value: "7" - tagFamilies: - - name: default - tags: - - key: entity_id - value: - str: - value: entity_1 - timestamp: "2022-10-17T12:59:37.627Z" \ No newline at end of file + value: entity_6 + timestamp: "2023-06-26T00:36:00Z" diff --git a/test/cases/measure/data/want/order_desc.yaml b/test/cases/measure/data/want/order_desc.yaml index 4c4f749e..cd41c5dc 100644 --- a/test/cases/measure/data/want/order_desc.yaml +++ b/test/cases/measure/data/want/order_desc.yaml @@ -16,23 +16,6 @@ # under the License. dataPoints: -- fields: - - name: total - value: - int: - value: "300" - - name: value - value: - int: - value: "7" - tagFamilies: - - name: default - tags: - - key: entity_id - value: - str: - value: entity_1 - timestamp: "2022-10-17T13:01:03.444Z" - fields: - name: total value: @@ -48,25 +31,8 @@ dataPoints: - key: entity_id value: str: - value: entity_1 - timestamp: "2022-10-17T13:01:02.444Z" -- fields: - - name: total - value: - int: - value: "50" - - name: value - value: - int: - value: "12" - tagFamilies: - - name: default - tags: - - key: entity_id - value: - str: - value: entity_1 - timestamp: "2022-10-17T13:00:03.444Z" + value: entity_6 + timestamp: "2023-06-26T00:59:00Z" - fields: - name: total value: @@ -82,25 +48,8 @@ dataPoints: - key: entity_id value: str: - value: entity_1 - timestamp: "2022-10-17T13:00:02.444Z" -- fields: - - name: total - value: - int: - value: "100" - - name: value - value: - int: - value: "11" - tagFamilies: - - name: default - tags: - - key: entity_id - value: - str: - value: entity_3 - timestamp: "2022-10-17T12:59:03.444Z" + value: entity_5 + timestamp: "2023-06-26T00:58:00Z" - fields: - name: total value: @@ -116,25 +65,8 @@ dataPoints: - key: entity_id value: str: - value: entity_3 - timestamp: "2022-10-17T12:59:02.444Z" -- fields: - - name: total - value: - int: - value: "100" - - name: value - value: - int: - value: "8" - tagFamilies: - - name: default - tags: - - key: entity_id - value: - str: - value: entity_2 - timestamp: "2022-10-17T12:58:03.444Z" + value: entity_4 + timestamp: "2023-06-26T00:57:00Z" - fields: - name: total value: @@ -150,25 +82,8 @@ dataPoints: - key: entity_id value: str: - value: entity_2 - timestamp: "2022-10-17T12:58:02.444Z" -- fields: - - name: total - value: - int: - value: "100" - - name: value - value: - int: - value: "9" - tagFamilies: - - name: default - tags: - - key: entity_id - value: - str: - value: entity_2 - timestamp: "2022-10-17T12:57:03.444Z" + value: entity_3 + timestamp: "2023-06-26T00:56:00Z" - fields: - name: total value: @@ -185,24 +100,7 @@ dataPoints: value: str: value: entity_2 - timestamp: "2022-10-17T12:57:02.444Z" -- fields: - - name: total - value: - int: - value: "100" - - name: value - value: - int: - value: "10" - tagFamilies: - - name: default - tags: - - key: entity_id - value: - str: - value: entity_2 - timestamp: "2022-10-17T12:56:03.444Z" + timestamp: "2023-06-26T00:55:00Z" - fields: - name: total value: @@ -219,4 +117,4 @@ dataPoints: value: str: value: entity_1 - timestamp: "2022-10-17T12:56:02.444Z" \ No newline at end of file + timestamp: "2023-06-26T00:54:00Z" diff --git a/test/cases/measure/data/want/tag_filter.yaml b/test/cases/measure/data/want/tag_filter.yaml index a588ad4a..333859af 100644 --- a/test/cases/measure/data/want/tag_filter.yaml +++ b/test/cases/measure/data/want/tag_filter.yaml @@ -16,19 +16,54 @@ # under the License. dataPoints: - - fields: - - name: total +- fields: + - name: total + value: + int: + value: "100" + - name: value + value: + int: + value: "1" + tagFamilies: + - name: default + tags: + - key: id value: - int: - value: "100" - - name: value + str: + value: svc1 + timestamp: "2023-06-25T23:33:00Z" +- fields: + - name: total + value: + int: + value: "100" + - name: value + value: + int: + value: "2" + tagFamilies: + - name: default + tags: + - key: id value: - int: - value: "1" - tagFamilies: - - name: default - tags: - - key: id - value: - str: - value: "1" \ No newline at end of file + str: + value: svc1 + timestamp: "2023-06-25T23:34:00Z" +- fields: + - name: total + value: + int: + value: "100" + - name: value + value: + int: + value: "3" + tagFamilies: + - name: default + tags: + - key: id + value: + str: + value: svc1 + timestamp: "2023-06-25T23:35:00Z" \ No newline at end of file diff --git a/test/cases/measure/data/want/top.yaml b/test/cases/measure/data/want/top.yaml index ac6d57cc..972adee0 100644 --- a/test/cases/measure/data/want/top.yaml +++ b/test/cases/measure/data/want/top.yaml @@ -20,23 +20,23 @@ dataPoints: - name: value value: int: - value: "8" + value: "6" tagFamilies: - name: default tags: - - key: entity_id + - key: id value: str: - value: entity_3 + value: svc3 - fields: - name: value value: int: - value: "6" + value: "4" tagFamilies: - name: default tags: - - key: entity_id + - key: id value: str: - value: entity_2 \ No newline at end of file + value: svc2 \ No newline at end of file diff --git a/test/cases/measure/data/want/update.yaml b/test/cases/measure/data/want/update.yaml index ee0f55d1..9bf60003 100644 --- a/test/cases/measure/data/want/update.yaml +++ b/test/cases/measure/data/want/update.yaml @@ -16,123 +16,129 @@ # under the License. dataPoints: - - fields: - - name: total +- fields: + - name: total + value: + int: + value: "200" + - name: value + value: + int: + value: "2" + tagFamilies: + - name: default + tags: + - key: id value: - int: - value: "200" - - name: value + str: + value: svc1 + - key: entity_id value: - int: - value: "3" - tagFamilies: - - name: default - tags: - - key: id - value: - str: - value: "1" - - key: entity_id - value: - str: - value: entity_1 - - fields: - - name: total + str: + value: entity_1 + timestamp: "2023-06-26T04:49:57.500Z" +- fields: + - name: total + value: + int: + value: "200" + - name: value + value: + int: + value: "3" + tagFamilies: + - name: default + tags: + - key: id value: - int: - value: "100" - - name: value + str: + value: svc1 + - key: entity_id value: - int: - value: "1" - tagFamilies: - - name: default - tags: - - key: id - value: - str: - value: "4" - - key: entity_id - value: - str: - value: entity_2 - - fields: - - name: total + str: + value: entity_2 + timestamp: "2023-06-26T04:49:58Z" +- fields: + - name: total + value: + int: + value: "200" + - name: value + value: + int: + value: "4" + tagFamilies: + - name: default + tags: + - key: id value: - int: - value: "100" - - name: value + str: + value: svc1 + - key: entity_id value: - int: - value: "1" - tagFamilies: - - name: default - tags: - - key: id - value: - str: - value: "5" - - key: entity_id - value: - str: - value: entity_2 - - fields: - - name: total + str: + value: entity_3 + timestamp: "2023-06-26T04:49:58.500Z" +- fields: + - name: total + value: + int: + value: "200" + - name: value + value: + int: + value: "6" + tagFamilies: + - name: default + tags: + - key: id value: - int: - value: "100" - - name: value + str: + value: svc2 + - key: entity_id value: - int: - value: "5" - tagFamilies: - - name: default - tags: - - key: id - value: - str: - value: "6" - - key: entity_id - value: - str: - value: entity_3 - - fields: - - name: total + str: + value: entity_4 + timestamp: "2023-06-26T04:49:59Z" +- fields: + - name: total + value: + int: + value: "60" + - name: value + value: + int: + value: "5" + tagFamilies: + - name: default + tags: + - key: id value: - int: - value: "50" - - name: value + str: + value: svc2 + - key: entity_id value: - int: - value: "4" - tagFamilies: - - name: default - tags: - - key: id - value: - str: - value: "2" - - key: entity_id - value: - str: - value: entity_1 - - fields: - - name: total + str: + value: entity_5 + timestamp: "2023-06-26T04:49:59.500Z" +- fields: + - name: total + value: + int: + value: "400" + - name: value + value: + int: + value: "7" + tagFamilies: + - name: default + tags: + - key: id value: - int: - value: "300" - - name: value + str: + value: svc3 + - key: entity_id value: - int: - value: "5" - tagFamilies: - - name: default - tags: - - key: id - value: - str: - value: "3" - - key: entity_id - value: - str: - value: entity_1 \ No newline at end of file + str: + value: entity_6 + timestamp: "2023-06-26T04:50:00Z" diff --git a/test/integration/cold_query/query_suite_test.go b/test/integration/cold_query/query_suite_test.go index 0319b5f5..9f3c16df 100644 --- a/test/integration/cold_query/query_suite_test.go +++ b/test/integration/cold_query/query_suite_test.go @@ -73,8 +73,6 @@ var _ = SynchronizedBeforeSuite(func() []byte { casesmeasureData.Write(conn, "service_traffic", "sw_metric", "service_traffic_data.json", now, interval) casesmeasureData.Write(conn, "service_instance_traffic", "sw_metric", "service_instance_traffic_data.json", now, interval) casesmeasureData.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", now, interval) - casesmeasureData.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data1.json", now.Add(10*time.Second), interval) - casesmeasureData.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data2.json", now.Add(10*time.Minute), interval) casesmeasureData.Write(conn, "instance_clr_cpu_minute", "sw_metric", "instance_clr_cpu_minute_data.json", now, interval) casesmeasureData.Write(conn, "service_instance_cpm_minute", "sw_metric", "service_instance_cpm_minute_data.json", now, interval) casesmeasureData.Write(conn, "service_instance_cpm_minute", "sw_metric", "service_instance_cpm_minute_data1.json", now.Add(10*time.Second), interval) diff --git a/test/integration/other/measure_test.go b/test/integration/other/measure_test.go index 1df0b041..24800b19 100644 --- a/test/integration/other/measure_test.go +++ b/test/integration/other/measure_test.go @@ -49,7 +49,8 @@ var _ = g.Describe("Query service_cpm_minute", func() { var err error conn, err = grpchelper.Conn(addr, 10*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials())) gm.Expect(err).NotTo(gm.HaveOccurred()) - baseTime = timestamp.NowMilli() + ns := timestamp.NowMilli().UnixNano() + baseTime = time.Unix(0, ns-ns%int64(time.Minute)) interval = 500 * time.Millisecond casesMeasureData.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", baseTime, interval) goods = gleak.Goroutines() @@ -65,7 +66,7 @@ var _ = g.Describe("Query service_cpm_minute", func() { casesMeasureData.VerifyFn(innerGm, helpers.SharedContext{ Connection: conn, BaseTime: baseTime, - }, helpers.Args{Input: "all", Want: "update", Duration: 1 * time.Hour}) - }, flags.EventuallyTimeout) + }, helpers.Args{Input: "all", Want: "update", Duration: 25 * time.Minute, Offset: -20 * time.Minute}) + }, flags.EventuallyTimeout).Should(gm.Succeed()) }) }) diff --git a/test/integration/other/tls_test.go b/test/integration/other/tls_test.go index 9ae5e121..acda8f30 100644 --- a/test/integration/other/tls_test.go +++ b/test/integration/other/tls_test.go @@ -56,7 +56,8 @@ var _ = g.Describe("Query service_cpm_minute", func() { gm.Eventually(helpers.HealthCheck(addr, 10*time.Second, 10*time.Second, grpclib.WithTransportCredentials(creds)), flags.EventuallyTimeout).Should(gm.Succeed()) conn, err = grpchelper.Conn(addr, 10*time.Second, grpclib.WithTransportCredentials(creds)) gm.Expect(err).NotTo(gm.HaveOccurred()) - baseTime = timestamp.NowMilli() + ns := timestamp.NowMilli().UnixNano() + baseTime = time.Unix(0, ns-ns%int64(time.Minute)) interval = 500 * time.Millisecond casesMeasureData.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", baseTime, interval) goods = gleak.Goroutines() @@ -71,7 +72,7 @@ var _ = g.Describe("Query service_cpm_minute", func() { casesMeasureData.VerifyFn(innerGm, helpers.SharedContext{ Connection: conn, BaseTime: baseTime, - }, helpers.Args{Input: "all", Duration: 1 * time.Hour}) - }, flags.EventuallyTimeout) + }, helpers.Args{Input: "all", Duration: 25 * time.Minute, Offset: -20 * time.Minute}) + }, flags.EventuallyTimeout).Should(gm.Succeed()) }) }) diff --git a/test/integration/query/query_suite_test.go b/test/integration/query/query_suite_test.go index d8521ccf..b514c43b 100644 --- a/test/integration/query/query_suite_test.go +++ b/test/integration/query/query_suite_test.go @@ -74,8 +74,6 @@ var _ = SynchronizedBeforeSuite(func() []byte { casesmeasuredata.Write(conn, "service_traffic", "sw_metric", "service_traffic_data.json", now, interval) casesmeasuredata.Write(conn, "service_instance_traffic", "sw_metric", "service_instance_traffic_data.json", now, interval) casesmeasuredata.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", now, interval) - casesmeasuredata.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data1.json", now.Add(10*time.Second), interval) - casesmeasuredata.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data2.json", now.Add(10*time.Minute), interval) casesmeasuredata.Write(conn, "instance_clr_cpu_minute", "sw_metric", "instance_clr_cpu_minute_data.json", now, interval) casesmeasuredata.Write(conn, "service_instance_cpm_minute", "sw_metric", "service_instance_cpm_minute_data.json", now, interval) casesmeasuredata.Write(conn, "service_instance_cpm_minute", "sw_metric", "service_instance_cpm_minute_data1.json", now.Add(10*time.Second), interval) diff --git a/test/stress/cases/istio/report.md b/test/stress/cases/istio/report.md index 79c4eebd..83d985d6 100644 --- a/test/stress/cases/istio/report.md +++ b/test/stress/cases/istio/report.md @@ -3,47 +3,50 @@ ## Result ```bash -Ran 1 of 1 Specs in 2368.640 seconds +Ran 1 of 1 Specs in 2384.312 seconds SUCCESS! -- 1 Passed | 0 Failed | 0 Pending | 0 Skipped PASS -Ginkgo ran 1 suite in 39m32.259409687s +Ginkgo ran 1 suite in 39m47.81357862s ``` ## Disk Usage ```bash -96M ./measure-minute/shard-0/seg-20230613/block-2023061313/inverted -25M ./measure-minute/shard-0/seg-20230613/block-2023061313/lsm -18M ./measure-minute/shard-0/seg-20230613/block-2023061313/encoded -26M ./measure-minute/shard-0/seg-20230613/block-2023061313/tst -163M ./measure-minute/shard-0/seg-20230613/block-2023061313 -4.0K ./measure-minute/shard-0/seg-20230613/index -163M ./measure-minute/shard-0/seg-20230613 +4.0K ./measure-minute/shard-0/seg-20230626/index +24M ./measure-minute/shard-0/seg-20230626/block-2023062607/lsm +17M ./measure-minute/shard-0/seg-20230626/block-2023062607/encoded +26M ./measure-minute/shard-0/seg-20230626/block-2023062607/tst +66M ./measure-minute/shard-0/seg-20230626/block-2023062607 +66M ./measure-minute/shard-0/seg-20230626 4.4M ./measure-minute/shard-0/series/md -4.4M ./measure-minute/shard-0/series -167M ./measure-minute/shard-0 -96M ./measure-minute/shard-1/seg-20230613/block-2023061313/inverted -24M ./measure-minute/shard-1/seg-20230613/block-2023061313/lsm -20M ./measure-minute/shard-1/seg-20230613/block-2023061313/encoded -26M ./measure-minute/shard-1/seg-20230613/block-2023061313/tst -164M ./measure-minute/shard-1/seg-20230613/block-2023061313 -4.0K ./measure-minute/shard-1/seg-20230613/index -164M ./measure-minute/shard-1/seg-20230613 +2.2M ./measure-minute/shard-0/series/inverted +20K ./measure-minute/shard-0/series/lsm +6.6M ./measure-minute/shard-0/series +72M ./measure-minute/shard-0 +4.0K ./measure-minute/shard-1/seg-20230626/index +24M ./measure-minute/shard-1/seg-20230626/block-2023062607/lsm +19M ./measure-minute/shard-1/seg-20230626/block-2023062607/encoded +26M ./measure-minute/shard-1/seg-20230626/block-2023062607/tst +68M ./measure-minute/shard-1/seg-20230626/block-2023062607 +68M ./measure-minute/shard-1/seg-20230626 4.4M ./measure-minute/shard-1/series/md -4.4M ./measure-minute/shard-1/series -169M ./measure-minute/shard-1 -336M ./measure-minute -93M ./measure-default/shard-0/seg-20230613/block-2023061313/inverted -79M ./measure-default/shard-0/seg-20230613/block-2023061313/lsm -70M ./measure-default/shard-0/seg-20230613/block-2023061313/encoded -86M ./measure-default/shard-0/seg-20230613/block-2023061313/tst -326M ./measure-default/shard-0/seg-20230613/block-2023061313 -4.0K ./measure-default/shard-0/seg-20230613/index -326M ./measure-default/shard-0/seg-20230613 +2.2M ./measure-minute/shard-1/series/inverted +20K ./measure-minute/shard-1/series/lsm +6.5M ./measure-minute/shard-1/series +74M ./measure-minute/shard-1 +146M ./measure-minute +4.0K ./measure-default/shard-0/seg-20230626/index +79M ./measure-default/shard-0/seg-20230626/block-2023062607/lsm +65M ./measure-default/shard-0/seg-20230626/block-2023062607/encoded +85M ./measure-default/shard-0/seg-20230626/block-2023062607/tst +228M ./measure-default/shard-0/seg-20230626/block-2023062607 +228M ./measure-default/shard-0/seg-20230626 16M ./measure-default/shard-0/series/md -16M ./measure-default/shard-0/series -341M ./measure-default/shard-0 -341M ./measure-default -676M . +2.4M ./measure-default/shard-0/series/inverted +20K ./measure-default/shard-0/series/lsm +18M ./measure-default/shard-0/series +245M ./measure-default/shard-0 +245M ./measure-default +391M . ```
