This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch storage-column in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit a04a05f9daa2bcc4d7be0bb108b7353792b573a5 Author: Gao Hongtao <[email protected]> AuthorDate: Sun Dec 10 22:04:56 2023 +0800 Support measure query Signed-off-by: Gao Hongtao <[email protected]> --- banyand/internal/storage/index.go | 46 ++- banyand/internal/storage/index_test.go | 7 +- banyand/internal/storage/storage.go | 9 +- banyand/internal/storage/tsdb.go | 14 +- banyand/measure/block.go | 206 +++++++++-- banyand/measure/block_metadata.go | 134 ++++++- .../{field_flag_test.go => block_reader.go} | 38 +- banyand/measure/block_writer.go | 16 +- banyand/measure/column.go | 78 ++-- banyand/measure/column_metadata.go | 40 ++- banyand/measure/datapoints.go | 5 +- banyand/measure/measure_query.go | 394 ++++++++++++++++++++- banyand/measure/measure_topn.go | 73 ++-- banyand/measure/measure_write.go | 27 +- banyand/measure/part.go | 44 ++- banyand/measure/part_iter.go | 258 ++++++++++++++ banyand/measure/primary_metadata.go | 71 +++- banyand/measure/tstable.go | 159 ++++++++- pkg/bytes/buffer.go | 10 +- pkg/bytes/{buffer.go => resize.go} | 57 +-- pkg/fs/file_system.go | 23 +- pkg/pb/v1/metadata.go | 52 +++ {banyand/internal/storage => pkg/pb/v1}/series.go | 63 +--- .../internal/storage => pkg/pb/v1}/series_test.go | 6 +- {banyand/internal/storage => pkg/pb/v1}/value.go | 92 ++++- .../internal/storage => pkg/pb/v1}/value_test.go | 2 +- pkg/query/executor/interface.go | 4 +- pkg/query/logical/common.go | 6 +- pkg/query/logical/expr.go | 2 +- pkg/query/logical/index_filter.go | 294 ++++++++++++++- .../measure/measure_plan_indexscan_local.go | 253 +++++-------- pkg/query/logical/measure/topn_plan_localscan.go | 145 +++----- pkg/query/logical/schema.go | 4 +- pkg/query/logical/stream/stream_plan_tag_filter.go | 2 +- 34 files changed, 2051 insertions(+), 583 deletions(-) diff --git a/banyand/internal/storage/index.go b/banyand/internal/storage/index.go index 5081db54..780a2fea 100644 --- a/banyand/internal/storage/index.go +++ b/banyand/internal/storage/index.go @@ -26,27 +26,21 @@ import ( "github.com/apache/skywalking-banyandb/api/common" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" - modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/index/inverted" "github.com/apache/skywalking-banyandb/pkg/index/posting" "github.com/apache/skywalking-banyandb/pkg/logger" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" ) func (d *database[T]) IndexDB() IndexDB { return d.index } -func (d *database[T]) Lookup(ctx context.Context, series *Series) (SeriesList, error) { +func (d *database[T]) Lookup(ctx context.Context, series *pbv1.Series) (pbv1.SeriesList, error) { return d.index.searchPrimary(ctx, series) } -// OrderBy specifies the order of the result. -type OrderBy struct { - Index *databasev1.IndexRule - Sort modelv1.Sort -} - type seriesIndex struct { store index.SeriesStore l *logger.Logger @@ -69,8 +63,8 @@ func newSeriesIndex(ctx context.Context, root string) (*seriesIndex, error) { var entityKey = index.FieldKey{} -func (s *seriesIndex) createPrimary(series *Series) (*Series, error) { - if err := series.marshal(); err != nil { +func (s *seriesIndex) createPrimary(series *pbv1.Series) (*pbv1.Series, error) { + if err := series.Marshal(); err != nil { return nil, err } id, err := s.store.Search(series.Buffer) @@ -97,7 +91,7 @@ func (s *seriesIndex) Write(docs index.Documents) error { var rangeOpts = index.RangeOpts{} -func (s *seriesIndex) searchPrimary(ctx context.Context, series *Series) (SeriesList, error) { +func (s *seriesIndex) searchPrimary(ctx context.Context, series *pbv1.Series) (pbv1.SeriesList, error) { var hasAny, hasWildcard bool var prefixIndex int @@ -105,7 +99,7 @@ func (s *seriesIndex) searchPrimary(ctx context.Context, series *Series) (Series if tv == nil { return nil, errors.New("nil tag value") } - if tv.Value == AnyEntry { + if tv == pbv1.AnyTagValue { if !hasAny { hasAny = true prefixIndex = i @@ -123,7 +117,7 @@ func (s *seriesIndex) searchPrimary(ctx context.Context, series *Series) (Series if hasAny { var ss []index.Series if hasWildcard { - if err = series.marshal(); err != nil { + if err = series.Marshal(); err != nil { return nil, err } ss, err = s.store.SearchWildcard(series.Buffer) @@ -133,7 +127,7 @@ func (s *seriesIndex) searchPrimary(ctx context.Context, series *Series) (Series return convertIndexSeriesToSeriesList(ss) } series.EntityValues = series.EntityValues[:prefixIndex] - if err = series.marshal(); err != nil { + if err = series.Marshal(); err != nil { return nil, err } ss, err = s.store.SearchPrefix(series.Buffer) @@ -142,7 +136,7 @@ func (s *seriesIndex) searchPrimary(ctx context.Context, series *Series) (Series } return convertIndexSeriesToSeriesList(ss) } - if err = series.marshal(); err != nil { + if err = series.Marshal(); err != nil { return nil, err } var seriesID common.SeriesID @@ -152,17 +146,17 @@ func (s *seriesIndex) searchPrimary(ctx context.Context, series *Series) (Series } if seriesID > 0 { series.ID = seriesID - return SeriesList{series}, nil + return pbv1.SeriesList{series}, nil } return nil, nil } -func convertIndexSeriesToSeriesList(indexSeries []index.Series) (SeriesList, error) { - seriesList := make(SeriesList, 0, len(indexSeries)) +func convertIndexSeriesToSeriesList(indexSeries []index.Series) (pbv1.SeriesList, error) { + seriesList := make(pbv1.SeriesList, 0, len(indexSeries)) for _, s := range indexSeries { - var series Series + var series pbv1.Series series.ID = s.ID - if err := series.unmarshal(s.EntityValues); err != nil { + if err := series.Unmarshal(s.EntityValues); err != nil { return nil, err } seriesList = append(seriesList, &series) @@ -170,13 +164,13 @@ func convertIndexSeriesToSeriesList(indexSeries []index.Series) (SeriesList, err return seriesList, nil } -func (s *seriesIndex) Search(ctx context.Context, series *Series, filter index.Filter, order *OrderBy) (SeriesList, error) { +func (s *seriesIndex) Search(ctx context.Context, series *pbv1.Series, filter index.Filter, order *pbv1.OrderBy) (pbv1.SeriesList, error) { seriesList, err := s.searchPrimary(ctx, series) if err != nil { return nil, err } - pl := seriesList.toList() + pl := seriesList.ToList() if filter != nil { var plFilter posting.List plFilter, err = filter.Execute(func(ruleType databasev1.IndexRule_Type) (index.Searcher, error) { @@ -190,7 +184,7 @@ func (s *seriesIndex) Search(ctx context.Context, series *Series, filter index.F } } - if order == nil { + if order == nil && order.Index == nil { return filterSeriesList(seriesList, pl), nil } @@ -205,7 +199,7 @@ func (s *seriesIndex) Search(ctx context.Context, series *Series, filter index.F err = multierr.Append(err, iter.Close()) }() - var sortedSeriesList SeriesList + var sortedSeriesList pbv1.SeriesList for iter.Next() { pv := iter.Val().Value if err = pv.Intersect(pl); err != nil { @@ -222,7 +216,7 @@ func (s *seriesIndex) Search(ctx context.Context, series *Series, filter index.F return sortedSeriesList, err } -func filterSeriesList(seriesList SeriesList, filter posting.List) SeriesList { +func filterSeriesList(seriesList pbv1.SeriesList, filter posting.List) pbv1.SeriesList { for i := 0; i < len(seriesList); i++ { if !filter.Contains(uint64(seriesList[i].ID)) { seriesList = append(seriesList[:i], seriesList[i+1:]...) @@ -232,7 +226,7 @@ func filterSeriesList(seriesList SeriesList, filter posting.List) SeriesList { return seriesList } -func appendSeriesList(dest, src SeriesList, filter posting.List) SeriesList { +func appendSeriesList(dest, src pbv1.SeriesList, filter posting.List) pbv1.SeriesList { for i := 0; i < len(src); i++ { if !filter.Contains(uint64(src[i].ID)) { continue diff --git a/banyand/internal/storage/index_test.go b/banyand/internal/storage/index_test.go index 5f5fc72a..97280850 100644 --- a/banyand/internal/storage/index_test.go +++ b/banyand/internal/storage/index_test.go @@ -27,11 +27,12 @@ import ( modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/pkg/logger" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/test" "github.com/apache/skywalking-banyandb/pkg/test/flags" ) -var testSeriesPool SeriesPool +var testSeriesPool pbv1.SeriesPool func TestSeriesIndex_Primary(t *testing.T) { ctx := context.Background() @@ -86,7 +87,7 @@ func TestSeriesIndex_Primary(t *testing.T) { subject: "service_instance_latency", entityValues: []*modelv1.TagValue{ {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "svc_1"}}}, - {Value: AnyEntry}, + pbv1.AnyTagValue, }, expected: []*modelv1.TagValue{ {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "svc_1"}}}, @@ -97,7 +98,7 @@ func TestSeriesIndex_Primary(t *testing.T) { name: "Wildcard", subject: "service_instance_latency", entityValues: []*modelv1.TagValue{ - {Value: AnyEntry}, + pbv1.AnyTagValue, {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "svc_1_instance_1"}}}, }, expected: []*modelv1.TagValue{ diff --git a/banyand/internal/storage/storage.go b/banyand/internal/storage/storage.go index 0bb912b0..d2dd5d72 100644 --- a/banyand/internal/storage/storage.go +++ b/banyand/internal/storage/storage.go @@ -35,6 +35,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/logger" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) @@ -64,16 +65,16 @@ type SupplyTSDB[T TSTable] func() T type IndexDB interface { Write(docs index.Documents) error - Search(ctx context.Context, series *Series, filter index.Filter, order *OrderBy) (SeriesList, error) + Search(ctx context.Context, series *pbv1.Series, filter index.Filter, order *pbv1.OrderBy) (pbv1.SeriesList, error) } // TSDB allows listing and getting shard details. type TSDB[T TSTable] interface { io.Closer - Register(shardID common.ShardID, series *Series) (*Series, error) - Lookup(ctx context.Context, series *Series) (SeriesList, error) + Register(shardID common.ShardID, series *pbv1.Series) (*pbv1.Series, error) + Lookup(ctx context.Context, series *pbv1.Series) (pbv1.SeriesList, error) CreateTSTableIfNotExist(shardID common.ShardID, ts time.Time) (TSTableWrapper[T], error) - SelectTSTables(shardID common.ShardID, timeRange timestamp.TimeRange) ([]TSTableWrapper[T], error) + SelectTSTables(timeRange timestamp.TimeRange) []TSTableWrapper[T] IndexDB() IndexDB } diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go index 8c5817fc..d6fdfcad 100644 --- a/banyand/internal/storage/tsdb.go +++ b/banyand/internal/storage/tsdb.go @@ -36,6 +36,7 @@ import ( "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/pkg/logger" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) @@ -114,7 +115,7 @@ func OpenTSDB[T TSTable](ctx context.Context, opts TSDBOpts[T]) (TSDB[T], error) return db, nil } -func (d *database[T]) Register(shardID common.ShardID, series *Series) (*Series, error) { +func (d *database[T]) Register(shardID common.ShardID, series *pbv1.Series) (*pbv1.Series, error) { var err error if series, err = d.index.createPrimary(series); err != nil { return nil, err @@ -144,11 +145,14 @@ func (d *database[T]) CreateTSTableIfNotExist(shardID common.ShardID, ts time.Ti return d.sLst[shardID].segmentController.createTSTable(timeRange.Start) } -func (d *database[T]) SelectTSTables(shardID common.ShardID, timeRange timestamp.TimeRange) ([]TSTableWrapper[T], error) { - if int(shardID) >= int(atomic.LoadUint32(&d.sLen)) { - return nil, ErrUnknownShard +func (d *database[T]) SelectTSTables(timeRange timestamp.TimeRange) []TSTableWrapper[T] { + var result []TSTableWrapper[T] + d.RLock() + for i := range d.sLst { + result = append(result, d.sLst[i].segmentController.selectTSTables(timeRange)...) } - return d.sLst[shardID].segmentController.selectTSTables(timeRange), nil + d.RUnlock() + return result } func (d *database[T]) registerShard(id int) error { diff --git a/banyand/measure/block.go b/banyand/measure/block.go index dabaf9d5..0f512113 100644 --- a/banyand/measure/block.go +++ b/banyand/measure/block.go @@ -21,16 +21,18 @@ import ( "sync" "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/pkg/bytes" "github.com/apache/skywalking-banyandb/pkg/encoding" + "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/logger" ) type block struct { timestamps []int64 - tagFamilies []columnFamily + tagFamilies []ColumnFamily - field columnFamily + field ColumnFamily } func (b *block) reset() { @@ -56,16 +58,16 @@ func (b *block) assertValid() { itemsCount := len(timestamps) tff := b.tagFamilies for _, tf := range tff { - for _, c := range tf.columns { - if len(c.values) != itemsCount { - logger.Panicf("unexpected number of values for tags %q: got %d; want %d", c.name, len(c.values), itemsCount) + for _, c := range tf.Columns { + if len(c.Values) != itemsCount { + logger.Panicf("unexpected number of values for tags %q: got %d; want %d", c.Name, len(c.Values), itemsCount) } } } ff := b.field - for _, f := range ff.columns { - if len(f.values) != itemsCount { - logger.Panicf("unexpected number of values for fields %q: got %d; want %d", f.name, len(f.values), itemsCount) + for _, f := range ff.Columns { + if len(f.Values) != itemsCount { + logger.Panicf("unexpected number of values for fields %q: got %d; want %d", f.Name, len(f.Values), itemsCount) } } } @@ -107,23 +109,23 @@ func (b *block) processTagFamilies(tff []nameValues, i int, dataPointsLen int) { } } -func (b *block) processTags(tf nameValues, cf columnFamily, i int, dataPointsLen int) { +func (b *block) processTags(tf nameValues, cf ColumnFamily, i int, dataPointsLen int) { cf.resizeColumns(len(tf.values)) for k, t := range tf.values { - b.processTag(t, cf.columns[k], i, dataPointsLen) + b.processTag(t, cf.Columns[k], i, dataPointsLen) } } -func (b *block) processTag(t *nameValue, c column, i int, dataPointsLen int) { +func (b *block) processTag(t *nameValue, c Column, i int, dataPointsLen int) { c.resizeValues(dataPointsLen) - c.valueType = t.valueType - c.values[i] = t.marshal() + c.ValueType = t.valueType + c.Values[i] = t.marshal() } -func (b *block) resizeTagFamilies(tagFamiliesLen int) []columnFamily { +func (b *block) resizeTagFamilies(tagFamiliesLen int) []ColumnFamily { tff := b.tagFamilies[:0] if n := tagFamiliesLen - cap(tff); n > 0 { - tff = append(tff[:cap(tff)], make([]columnFamily, n)...) + tff = append(tff[:cap(tff)], make([]ColumnFamily, n)...) } tff = tff[:tagFamiliesLen] b.tagFamilies = tff @@ -153,32 +155,51 @@ func (b *block) mustWriteTo(sid common.SeriesID, bh *blockMetadata, sw *writers) // Marshal field f := b.field - cc := f.columns + cc := f.Columns chh := bh.field.resizeColumnMetadata(len(cc)) for i := range cc { cc[i].mustWriteTo(&chh[i], &sw.fieldValuesWriter) } } -func (b *block) marshalTagFamily(tf columnFamily, bh *blockMetadata, sw *writers) { - hw, w := sw.getColumnMetadataWriterAndColumnWriter(tf.name) - cc := tf.columns +func (b *block) marshalTagFamily(tf ColumnFamily, bh *blockMetadata, sw *writers) { + hw, w := sw.getColumnMetadataWriterAndColumnWriter(tf.Name) + cc := tf.Columns cfh := getColumnFamilyMetadata() chh := cfh.resizeColumnMetadata(len(cc)) for i := range cc { cc[i].mustWriteTo(&chh[i], w) } bb := longTermBufPool.Get() + defer longTermBufPool.Put(bb) bb.Buf = cfh.marshal(bb.Buf) putColumnFamilyMetadata(cfh) - tfh := bh.getTagFamilyMetadata(tf.name) + tfh := bh.getTagFamilyMetadata(tf.Name) tfh.offset = w.bytesWritten tfh.size = uint64(len(bb.Buf)) if tfh.size > maxTagFamiliesMetadataSize { logger.Panicf("too big columnFamilyMetadataSize: %d bytes; mustn't exceed %d bytes", tfh.size, maxTagFamiliesMetadataSize) } hw.MustWrite(bb.Buf) +} + +func (b *block) unmarshalTagFamily(decoder *encoding.BytesBlockDecoder, tfIndex int, name string, columnFamilyMetadataBlock *dataBlock, metaReader, valueReader fs.Reader) { + bb := longTermBufPool.Get() + bytes.ResizeExact(bb.Buf, int(columnFamilyMetadataBlock.size)) + fs.MustReadData(metaReader, int64(columnFamilyMetadataBlock.offset), bb.Buf) + cfm := getColumnFamilyMetadata() + defer putColumnFamilyMetadata(cfm) + _, err := cfm.unmarshal(bb.Buf) + if err != nil { + logger.Panicf("%s: cannot unmarshal columnFamilyMetadata: %v", metaReader.Path(), err) + } longTermBufPool.Put(bb) + tf := b.tagFamilies[tfIndex] + cc := tf.resizeColumns(len(cfm.columnMetadata)) + for i, c := range cc { + c.mustReadValues(decoder, valueReader, cfm.columnMetadata[i], uint64(b.Len())) + } + } func (b *block) uncompressedSizeBytes() uint64 { @@ -189,10 +210,10 @@ func (b *block) uncompressedSizeBytes() uint64 { tff := b.tagFamilies for i := range tff { tf := &tff[i] - nameLen := uint64(len(tf.name)) - for _, c := range tf.columns { - nameLen += uint64(len(c.name)) - for _, v := range c.values { + nameLen := uint64(len(tf.Name)) + for _, c := range tf.Columns { + nameLen += uint64(len(c.Name)) + for _, v := range c.Values { if len(v) > 0 { n += nameLen + uint64(len(v)) } @@ -201,10 +222,10 @@ func (b *block) uncompressedSizeBytes() uint64 { } ff := b.field - for i := range ff.columns { - c := &ff.columns[i] - nameLen := uint64(len(c.name)) - for _, v := range c.values { + for i := range ff.Columns { + c := &ff.Columns[i] + nameLen := uint64(len(c.Name)) + for _, v := range c.Values { if len(v) > 0 { n += nameLen + uint64(len(v)) } @@ -213,19 +234,50 @@ func (b *block) uncompressedSizeBytes() uint64 { return n } +func (b *block) mustReadFrom(decoder *encoding.BytesBlockDecoder, p *part, bm *blockMetadata, opts *QueryOptions) { + b.reset() + + b.timestamps = mustReadTimestampsFrom(b.timestamps, &bm.timestamps, int(bm.count), p.timestamps) + + _ = b.resizeTagFamilies(len(bm.tagFamilies)) + var i int + for name, block := range bm.tagFamilies { + b.unmarshalTagFamily(decoder, i, name, block, p.tagFamilyMetadata[name], p.tagFamilies[name]) + i++ + } + cc := b.field.resizeColumns(len(bm.field.columnMetadata)) + for i, c := range cc { + c.mustReadValues(decoder, p.fieldValues, bm.field.columnMetadata[i], bm.count) + } + +} + func mustWriteTimestampsTo(th *timestampsMetadata, timestamps []int64, sw *writers) { th.reset() bb := longTermBufPool.Get() + defer longTermBufPool.Put(bb) bb.Buf, th.marshalType, th.min = encoding.Int64ListToBytes(bb.Buf[:0], timestamps) if len(bb.Buf) > maxTimestampsBlockSize { - logger.Panicf("BUG: too big block with timestamps: %d bytes; the maximum supported size is %d bytes", len(bb.Buf), maxTimestampsBlockSize) + logger.Panicf("too big block with timestamps: %d bytes; the maximum supported size is %d bytes", len(bb.Buf), maxTimestampsBlockSize) } th.max = timestamps[len(timestamps)-1] th.offset = sw.timestampsWriter.bytesWritten th.size = uint64(len(bb.Buf)) sw.timestampsWriter.MustWrite(bb.Buf) - longTermBufPool.Put(bb) + +} + +func mustReadTimestampsFrom(dst []int64, tm *timestampsMetadata, count int, reader fs.Reader) []int64 { + bb := longTermBufPool.Get() + defer longTermBufPool.Put(bb) + fs.MustReadData(reader, int64(tm.offset), bb.Buf) + var err error + dst, err = encoding.BytesToInt64List(dst, bb.Buf, tm.marshalType, tm.min, count) + if err != nil { + logger.Panicf("%s: cannot unmarshal timestamps: %v", reader.Path(), err) + } + return dst } func getBlock() *block { @@ -242,3 +294,97 @@ func putBlock(b *block) { } var blockPool sync.Pool + +type blockCursor struct { + idx int + + timestamps []int64 + + tagFamilies []ColumnFamily + + fields ColumnFamily + + columnValuesDecoder encoding.BytesBlockDecoder + p *part + bm *blockMetadata + opts *QueryOptions +} + +func (bc *blockCursor) reset() { + bc.idx = 0 + bc.p = nil + bc.bm = nil + bc.opts = nil + + bc.timestamps = bc.timestamps[:0] + + tff := bc.tagFamilies + for i := range tff { + tff[i].reset() + } + bc.tagFamilies = tff[:0] + bc.fields.reset() +} + +func (bc *blockCursor) init(p *part, bm *blockMetadata, opts *QueryOptions) { + bc.reset() + bc.p = p + bc.bm = bm + bc.opts = opts +} + +func (bc *blockCursor) loadData(tmpBlock *block) bool { + bc.reset() + tmpBlock.reset() + tmpBlock.mustReadFrom(&bc.columnValuesDecoder, bc.p, bc.bm, bc.opts) + + start, end, ok := findRange(tmpBlock.timestamps, bc.opts.minTimestamp, bc.opts.maxTimestamp) + if !ok { + return false + } + bc.timestamps = append(bc.timestamps, tmpBlock.timestamps[start:end]...) + bc.fields.Name = tmpBlock.field.Name + bc.fields.Columns = append(bc.fields.Columns, tmpBlock.field.Columns[start:end]...) + for _, cf := range tmpBlock.tagFamilies { + tf := ColumnFamily{ + Name: cf.Name, + } + tf.Columns = append(tf.Columns, cf.Columns[start:end]...) + bc.tagFamilies = append(bc.tagFamilies, tf) + } + return true +} + +func findRange(timestamps []int64, min int64, max int64) (int, int, bool) { + start, end := -1, -1 + + for i, t := range timestamps { + if t >= min && start == -1 { + start = i + } + if t <= max { + end = i + } + } + + if start == -1 || end == -1 { + return 0, 0, false + } + + return start, end, true +} + +var blockCursorPool sync.Pool + +func generateBlockCursor() *blockCursor { + v := blockCursorPool.Get() + if v == nil { + return &blockCursor{} + } + return v.(*blockCursor) +} + +func releaseBlockCursor(bc *blockCursor) { + bc.reset() + blockCursorPool.Put(bc) +} diff --git a/banyand/measure/block_metadata.go b/banyand/measure/block_metadata.go index b3c26cd9..a9f82721 100644 --- a/banyand/measure/block_metadata.go +++ b/banyand/measure/block_metadata.go @@ -18,9 +18,12 @@ package measure import ( + "errors" + "fmt" "sync" "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/encoding" ) @@ -40,11 +43,26 @@ func (h *dataBlock) copyFrom(src *dataBlock) { } func (h *dataBlock) marshal(dst []byte) []byte { - dst = encoding.Uint64ToBytes(dst, h.offset) - dst = encoding.Uint64ToBytes(dst, h.size) + dst = encoding.VarUint64ToBytes(dst, h.offset) + dst = encoding.VarUint64ToBytes(dst, h.size) return dst } +func (h *dataBlock) unmarshal(src []byte) ([]byte, error) { + src, n, err := encoding.BytesToVarUint64(src) + if err != nil { + return nil, fmt.Errorf("cannot unmarshal offset: %w", err) + } + h.offset = n + + src, n, err = encoding.BytesToVarUint64(src) + if err != nil { + return nil, fmt.Errorf("cannot unmarshal size: %w", err) + } + h.size = n + return src, nil +} + type blockMetadata struct { seriesID common.SeriesID @@ -99,7 +117,8 @@ func (bh *blockMetadata) marshal(dst []byte) []byte { dst = encoding.VarUint64ToBytes(dst, bh.count) dst = bh.timestamps.marshal(dst) dst = encoding.VarUint64ToBytes(dst, uint64(len(bh.tagFamilies))) - for _, cf := range bh.tagFamilies { + for name, cf := range bh.tagFamilies { + dst = encoding.EncodeBytes(dst, convert.StringToBytes(name)) dst = cf.marshal(dst) } if len(bh.field.columnMetadata) > 0 { @@ -108,6 +127,61 @@ func (bh *blockMetadata) marshal(dst []byte) []byte { return dst } +func (bh *blockMetadata) unmarshal(src []byte) ([]byte, error) { + if len(src) < 8 { + return nil, errors.New("cannot unmarshal blockMetadata from less than 8 bytes") + } + bh.seriesID = common.SeriesID(encoding.BytesToUint64(src)) + src = src[8:] + src, n, err := encoding.BytesToVarUint64(src) + if err != nil { + return nil, fmt.Errorf("cannot unmarshal uncompressedSizeBytes: %w", err) + } + bh.uncompressedSizeBytes = n + + src, n, err = encoding.BytesToVarUint64(src) + if err != nil { + return nil, fmt.Errorf("cannot unmarshal count: %w", err) + } + bh.count = n + src, err = bh.timestamps.unmarshal(src) + if err != nil { + return nil, fmt.Errorf("cannot unmarshal timestampsMetadata: %w", err) + } + src, n, err = encoding.BytesToVarUint64(src) + if err != nil { + return nil, fmt.Errorf("cannot unmarshal tagFamilies count: %w", err) + } + var nameBytes []byte + for i := uint64(0); i < n; i++ { + src, nameBytes, err = encoding.DecodeBytes(src) + if err != nil { + return nil, fmt.Errorf("cannot unmarshal tagFamily name: %w", err) + } + // TODO: cache dataBlock + tf := &dataBlock{} + src, err = tf.unmarshal(src) + if err != nil { + return nil, fmt.Errorf("cannot unmarshal tagFamily dataBlock: %w", err) + } + bh.tagFamilies[convert.BytesToString(nameBytes)] = tf + } + if len(src) > 0 { + src, err = bh.field.unmarshal(src) + if err != nil { + return nil, fmt.Errorf("cannot unmarshal columnFamilyMetadata: %w", err) + } + } + return src, nil +} + +func (bh *blockMetadata) less(other *blockMetadata) bool { + if bh.seriesID == other.seriesID { + return bh.timestamps.min < other.timestamps.min + } + return bh.seriesID < other.seriesID +} + func getBlockMetadata() *blockMetadata { v := blockMetadataPool.Get() if v == nil { @@ -151,3 +225,57 @@ func (th *timestampsMetadata) marshal(dst []byte) []byte { dst = append(dst, byte(th.marshalType)) return dst } + +func (th *timestampsMetadata) unmarshal(src []byte) ([]byte, error) { + if len(src) < 25 { + return nil, errors.New("cannot unmarshal timestampsMetadata from less than 25 bytes") + } + th.offset = encoding.BytesToUint64(src) + th.size = encoding.BytesToUint64(src[8:]) + th.min = int64(encoding.BytesToUint64(src)) + src = src[8:] + th.max = int64(encoding.BytesToUint64(src)) + src = src[8:] + th.marshalType = encoding.EncodeType(src[0]) + return src[1:], nil +} + +func unmarshalBlockMetadata(dst []blockMetadata, src []byte) ([]blockMetadata, error) { + dstOrig := dst + for len(src) > 0 { + if len(dst) < cap(dst) { + dst = dst[:len(dst)+1] + } else { + dst = append(dst, blockMetadata{}) + } + bm := &dst[len(dst)-1] + tail, err := bm.unmarshal(src) + if err != nil { + return dstOrig, fmt.Errorf("cannot unmarshal blockMetadata entries: %w", err) + } + src = tail + } + if err := validateBlockHeaders(dst[len(dstOrig):]); err != nil { + return dstOrig, err + } + return dst, nil +} + +func validateBlockHeaders(bhs []blockMetadata) error { + for i := 1; i < len(bhs); i++ { + bhCurr := &bhs[i] + bhPrev := &bhs[i-1] + if bhCurr.seriesID < bhPrev.seriesID { + return fmt.Errorf("unexpected blockMetadata with smaller seriesID=%d after bigger seriesID=%d at position %d", &bhCurr.seriesID, &bhPrev.seriesID, i) + } + if bhCurr.seriesID != bhPrev.seriesID { + continue + } + thCurr := bhCurr.timestamps + thPrev := bhPrev.timestamps + if thCurr.min < thPrev.min { + return fmt.Errorf("unexpected blockMetadata with smaller timestamp=%d after bigger timestamp=%d at position %d", thCurr.min, thPrev.min, i) + } + } + return nil +} diff --git a/banyand/measure/field_flag_test.go b/banyand/measure/block_reader.go similarity index 51% rename from banyand/measure/field_flag_test.go rename to banyand/measure/block_reader.go index c47e457f..01358087 100644 --- a/banyand/measure/field_flag_test.go +++ b/banyand/measure/block_reader.go @@ -17,24 +17,28 @@ package measure -import ( - "testing" - "time" +import "github.com/apache/skywalking-banyandb/pkg/fs" - "github.com/stretchr/testify/assert" +type reader struct { + r fs.Reader + bytesRead uint64 +} + +func newReader(r fs.Reader) *reader { + return &reader{r: r} +} - databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" - pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" -) +func (r *reader) reset() { + r.r = nil + r.bytesRead = 0 +} + +func (r *reader) Path() string { + return r.r.Path() +} -func TestEncodeFieldFlag(t *testing.T) { - flag := pbv1.EncoderFieldFlag(&databasev1.FieldSpec{ - EncodingMethod: databasev1.EncodingMethod_ENCODING_METHOD_GORILLA, - CompressionMethod: databasev1.CompressionMethod_COMPRESSION_METHOD_ZSTD, - }, time.Minute) - fieldSpec, interval, err := pbv1.DecodeFieldFlag(flag) - assert.NoError(t, err) - assert.Equal(t, databasev1.EncodingMethod_ENCODING_METHOD_GORILLA, fieldSpec.EncodingMethod) - assert.Equal(t, databasev1.CompressionMethod_COMPRESSION_METHOD_ZSTD, fieldSpec.CompressionMethod) - assert.Equal(t, time.Minute, interval) +func (r *reader) Read(p []byte) (int, error) { + n, err := r.r.Read(0, p) + r.bytesRead += uint64(n) + return n, err } diff --git a/banyand/measure/block_writer.go b/banyand/measure/block_writer.go index ee4f1fe7..994e18bd 100644 --- a/banyand/measure/block_writer.go +++ b/banyand/measure/block_writer.go @@ -235,12 +235,12 @@ func (bsw *blockWriter) MustWriteDataPoints(sid common.SeriesID, timestamps []in bsw.primaryBlockData = bh.marshal(bsw.primaryBlockData) putBlockMetadata(bh) if len(bsw.primaryBlockData) > maxUncompressedPrimaryBlockSize { - bsw.mustFlushIndexBlock(bsw.primaryBlockData) + bsw.mustFlushPrimaryBlock(bsw.primaryBlockData) bsw.primaryBlockData = bsw.primaryBlockData[:0] } } -func (bsw *blockWriter) mustFlushIndexBlock(data []byte) { +func (bsw *blockWriter) mustFlushPrimaryBlock(data []byte) { if len(data) > 0 { bsw.primaryBlockMetadata.mustWriteBlock(data, bsw.sidFirst, bsw.minTimestamp, bsw.maxTimestamp, &bsw.streamWriters) bsw.metaData = bsw.primaryBlockMetadata.marshal(bsw.metaData) @@ -258,7 +258,7 @@ func (bsw *blockWriter) Flush(ph *partMetadata) { ph.MinTimestamp = bsw.totalMinTimestamp ph.MaxTimestamp = bsw.totalMaxTimestamp - bsw.mustFlushIndexBlock(bsw.primaryBlockData) + bsw.mustFlushPrimaryBlock(bsw.primaryBlockData) bb := longTermBufPool.Get() bb.Buf = zstd.Compress(bb.Buf[:0], bsw.metaData, 1) @@ -271,17 +271,17 @@ func (bsw *blockWriter) Flush(ph *partMetadata) { bsw.reset() } -func getBlockWriter() *blockWriter { - v := blockStreamWriterPool.Get() +func generateBlockWriter() *blockWriter { + v := blockWriterPool.Get() if v == nil { return &blockWriter{} } return v.(*blockWriter) } -func putBlockStreamWriter(bsw *blockWriter) { +func releaseBlockWriter(bsw *blockWriter) { bsw.reset() - blockStreamWriterPool.Put(bsw) + blockWriterPool.Put(bsw) } -var blockStreamWriterPool sync.Pool +var blockWriterPool sync.Pool diff --git a/banyand/measure/column.go b/banyand/measure/column.go index 33e6ea86..b5502c6f 100644 --- a/banyand/measure/column.go +++ b/banyand/measure/column.go @@ -18,47 +18,48 @@ package measure import ( - "github.com/apache/skywalking-banyandb/banyand/internal/storage" "github.com/apache/skywalking-banyandb/pkg/bytes" "github.com/apache/skywalking-banyandb/pkg/encoding" + "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/logger" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" ) -type column struct { - name string - valueType storage.ValueType - values [][]byte +type Column struct { + Name string + ValueType pbv1.ValueType + Values [][]byte } -func (c *column) reset() { - c.name = "" +func (c *Column) reset() { + c.Name = "" - values := c.values + values := c.Values for i := range values { values[i] = nil } - c.values = values[:0] + c.Values = values[:0] } -func (c *column) resizeValues(valuesLen int) [][]byte { - values := c.values +func (c *Column) resizeValues(valuesLen int) [][]byte { + values := c.Values if n := valuesLen - cap(values); n > 0 { values = append(values[:cap(values)], make([][]byte, n)...) } values = values[:valuesLen] - c.values = values + c.Values = values return values } -func (c *column) mustWriteTo(ch *columnMetadata, columnWriter *writer) { +func (c *Column) mustWriteTo(ch *columnMetadata, columnWriter *writer) { ch.reset() - ch.name = c.name - ch.valueType = c.valueType + ch.name = c.Name + ch.valueType = c.ValueType // remove value type from values - for i := range c.values { - c.values[i] = c.values[i][1:] + for i := range c.Values { + c.Values[i] = c.Values[i][1:] } // TODO: encoding values based on value type @@ -67,7 +68,7 @@ func (c *column) mustWriteTo(ch *columnMetadata, columnWriter *writer) { defer longTermBufPool.Put(bb) // marshal values - bb.Buf = encoding.EncodeBytesBlock(bb.Buf[:0], c.values) + bb.Buf = encoding.EncodeBytesBlock(bb.Buf[:0], c.Values) ch.size = uint64(len(bb.Buf)) if ch.size > maxValuesBlockSize { logger.Panicf("too valuesSize: %d bytes; mustn't exceed %d bytes", ch.size, maxValuesBlockSize) @@ -76,29 +77,48 @@ func (c *column) mustWriteTo(ch *columnMetadata, columnWriter *writer) { columnWriter.MustWrite(bb.Buf) } +func (c *Column) mustReadValues(decoder *encoding.BytesBlockDecoder, reader fs.Reader, cm columnMetadata, count uint64) { + c.Name = cm.name + c.ValueType = cm.valueType + + bb := longTermBufPool.Get() + defer longTermBufPool.Put(bb) + valuesSize := cm.size + if valuesSize > maxValuesBlockSize { + logger.Panicf("%s: block size cannot exceed %d bytes; got %d bytes", reader.Path(), maxValuesBlockSize, valuesSize) + } + bb.Buf = bytes.ResizeOver(bb.Buf, int(valuesSize)) + fs.MustReadData(reader, int64(cm.offset), bb.Buf) + var err error + c.Values, err = decoder.Decode(c.Values[:0], bb.Buf, count) + if err != nil { + logger.Panicf("%s: cannot decode values: %v", reader.Path(), err) + } +} + var longTermBufPool bytes.BufferPool -type columnFamily struct { - name string - columns []column +type ColumnFamily struct { + Name string + Columns []Column } -func (cf *columnFamily) reset() { - cf.name = "" +func (cf *ColumnFamily) reset() { + cf.Name = "" - columns := cf.columns + columns := cf.Columns for i := range columns { columns[i].reset() } - cf.columns = columns[:0] + cf.Columns = columns[:0] } -func (cf *columnFamily) resizeColumns(columnsLen int) []column { - columns := cf.columns +func (cf *ColumnFamily) resizeColumns(columnsLen int) []Column { + columns := cf.Columns if n := columnsLen - cap(columns); n > 0 { - columns = append(columns[:cap(columns)], make([]column, n)...) + columns = append(columns[:cap(columns)], make([]Column, n)...) } columns = columns[:columnsLen] - cf.columns = columns + cf.Columns = columns return columns } diff --git a/banyand/measure/column_metadata.go b/banyand/measure/column_metadata.go index 5896580d..a14f7c80 100644 --- a/banyand/measure/column_metadata.go +++ b/banyand/measure/column_metadata.go @@ -18,17 +18,18 @@ package measure import ( + "fmt" "sync" - "github.com/apache/skywalking-banyandb/banyand/internal/storage" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/encoding" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" ) type columnMetadata struct { dataBlock name string - valueType storage.ValueType + valueType pbv1.ValueType } func (cm *columnMetadata) reset() { @@ -51,6 +52,24 @@ func (cm *columnMetadata) marshal(dst []byte) []byte { return dst } +func (cm *columnMetadata) unmarshal(src []byte) ([]byte, error) { + src, nameBytes, err := encoding.DecodeBytes(src) + if err != nil { + return nil, fmt.Errorf("cannot unmarshal columnMetadata.name: %w", err) + } + cm.name = convert.BytesToString(nameBytes) + if len(src) < 1 { + return nil, fmt.Errorf("cannot unmarshal columnMetadata.valueType: src is too short") + } + cm.valueType = pbv1.ValueType(src[0]) + src = src[1:] + src, err = cm.dataBlock.unmarshal(src) + if err != nil { + return nil, fmt.Errorf("cannot unmarshal columnMetadata.dataBlock: %w", err) + } + return src, nil +} + type columnFamilyMetadata struct { columnMetadata []columnMetadata } @@ -94,13 +113,28 @@ func (cfm *columnFamilyMetadata) resizeColumnMetadata(columnMetadataLen int) []c func (cfm *columnFamilyMetadata) marshal(dst []byte) []byte { cms := cfm.columnMetadata - dst = encoding.Uint64ToBytes(dst, uint64(len(cms))) + dst = encoding.VarUint64ToBytes(dst, uint64(len(cms))) for i := range cms { dst = cms[i].marshal(dst) } return dst } +func (cfm *columnFamilyMetadata) unmarshal(src []byte) ([]byte, error) { + src, columnMetadataLen, err := encoding.BytesToVarInt64(src) + if err != nil { + return nil, fmt.Errorf("cannot unmarshal columnMetadataLen: %w", err) + } + cms := cfm.resizeColumnMetadata(int(columnMetadataLen)) + for i := range cms { + src, err = cms[i].unmarshal(src) + if err != nil { + return nil, fmt.Errorf("cannot unmarshal columnMetadata %d: %w", i, err) + } + } + return src, nil +} + func getColumnFamilyMetadata() *columnFamilyMetadata { v := columnFamilyMetadataPool.Get() if v == nil { diff --git a/banyand/measure/datapoints.go b/banyand/measure/datapoints.go index 44912cd6..d02f1065 100644 --- a/banyand/measure/datapoints.go +++ b/banyand/measure/datapoints.go @@ -23,13 +23,14 @@ import ( "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/banyand/internal/storage" "github.com/apache/skywalking-banyandb/pkg/index" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/timestamp" "github.com/pkg/errors" ) type nameValue struct { name string - valueType storage.ValueType + valueType pbv1.ValueType value []byte valueArr [][]byte } @@ -51,7 +52,7 @@ func (n *nameValue) marshal() []byte { if n.valueArr != nil { var dst []byte for i := range n.valueArr { - if n.valueType == storage.ValueTypeInt64Arr { + if n.valueType == pbv1.ValueTypeInt64Arr { dst = append(dst, n.valueArr[i]...) continue } diff --git a/banyand/measure/measure_query.go b/banyand/measure/measure_query.go index 5a2dce55..718a1f56 100644 --- a/banyand/measure/measure_query.go +++ b/banyand/measure/measure_query.go @@ -18,7 +18,12 @@ package measure import ( + "container/heap" + "context" + "fmt" "io" + "sort" + "sync" "time" "github.com/pkg/errors" @@ -29,7 +34,10 @@ import ( databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/banyand/internal/storage" "github.com/apache/skywalking-banyandb/banyand/tsdb" + "github.com/apache/skywalking-banyandb/pkg/convert" + "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema" ) @@ -45,6 +53,7 @@ type Query interface { // Measure allows inspecting measure data points' details. type Measure interface { io.Closer + Query(ctx context.Context, opts pbv1.MeasureQueryOptions) (pbv1.MeasureQueryResult, error) Shards(entity tsdb.Entity) ([]tsdb.Shard, error) CompanionShards(metadata *commonv1.Metadata) ([]tsdb.Shard, error) Shard(id common.ShardID) (tsdb.Shard, error) @@ -53,6 +62,7 @@ type Measure interface { GetSchema() *databasev1.Measure GetIndexRules() []*databasev1.IndexRule GetInterval() time.Duration + GetShardNum() uint32 SetSchema(schema *databasev1.Measure) } @@ -66,6 +76,386 @@ func (s *measure) GetInterval() time.Duration { return s.interval } +func (s *measure) GetShardNum() uint32 { + return s.shardNum +} + +type QueryResult struct { + data []*blockCursor + originalData []*blockCursor + lastIndex int + pws []*partWrapper + shouldMergeAll bool + asc bool +} + +const round = time.Millisecond + +func (qr *QueryResult) Pull() *pbv1.Result { + if len(qr.data) == 0 { + return nil + } + // TODO:// Parallel load + tmpBlock := getBlock() + defer putBlock(tmpBlock) + for i := 0; i < len(qr.data); i++ { + if !qr.data[i].loadData(tmpBlock) { + qr.data = append(qr.data[:i], qr.data[i+1:]...) + i-- + } + } + if qr.shouldMergeAll { + if len(qr.data) == 1 { + r := &pbv1.Result{} + bc := qr.data[0] + bc.copyAllTo(r) + return r + } + return qr.merge(round) + } + originalData := qr.data + var lastSeriesID common.SeriesID + for i := qr.lastIndex; i < len(originalData); i++ { + bc := originalData[i] + if lastSeriesID != 0 && lastSeriesID != bc.bm.seriesID { + qr.data = qr.data[:0] + qr.data = append(qr.data, originalData[qr.lastIndex:i]...) + qr.lastIndex = i + return qr.merge(round) + } + lastSeriesID = bc.bm.seriesID + } + if qr.lastIndex < len(originalData)-1 { + qr.data = originalData[qr.lastIndex:] + return qr.merge(round) + } + return nil +} + +func (qr *QueryResult) Release() { + for i, v := range qr.data { + releaseBlockCursor(v) + qr.data[i] = nil + } + qr.data = qr.data[:0] + for i := range qr.pws { + qr.pws[i].decRef() + } + qr.pws = qr.pws[:0] +} + +func (qr QueryResult) Len() int { + return len(qr.data) +} + +func (qr QueryResult) Less(i, j int) bool { + if qr.asc { + return qr.data[i].timestamps[qr.data[i].idx] < qr.data[j].timestamps[qr.data[j].idx] + } + return qr.data[i].timestamps[qr.data[i].idx] > qr.data[j].timestamps[qr.data[j].idx] +} + +func (qr QueryResult) Swap(i, j int) { + qr.data[i], qr.data[j] = qr.data[j], qr.data[i] +} + +func (qr *QueryResult) Push(x interface{}) { + qr.data = append(qr.data, x.(*blockCursor)) +} + +func (qr *QueryResult) Pop() interface{} { + old := qr.data + n := len(old) + x := old[n-1] + qr.data = old[0 : n-1] + return x +} + +func (qr *QueryResult) merge(round time.Duration) *pbv1.Result { + result := &pbv1.Result{} + var lastOriginalTimestamp int64 + r := int64(round) + + for qr.Len() > 0 { + bc := heap.Pop(qr).(*blockCursor) + + roundedTimestamp := (bc.timestamps[bc.idx] / r) * r + + if len(result.Timestamps) > 0 && roundedTimestamp == result.Timestamps[len(result.Timestamps)-1] { + if bc.timestamps[bc.idx] > lastOriginalTimestamp { + lastOriginalTimestamp = bc.timestamps[bc.idx] + } + } else { + bc.copyTo(result) + lastOriginalTimestamp = bc.timestamps[bc.idx] + } + + bc.idx++ + + if bc.idx < len(bc.timestamps) { + heap.Push(qr, bc) + } + } + + return result +} + +type QueryOptions struct { + pbv1.MeasureQueryOptions + minTimestamp int64 + maxTimestamp int64 +} + +func (s *measure) Query(ctx context.Context, mqo pbv1.MeasureQueryOptions) (pbv1.MeasureQueryResult, error) { + if mqo.TimeRange == nil || mqo.Entity == nil { + return nil, errors.New("invalid query options: timeRange and series are required") + } + tsdb := s.databaseSupplier.SupplyTSDB().(storage.TSDB[*tsTable]) + tabWrappers := tsdb.SelectTSTables(*mqo.TimeRange) + sl, err := tsdb.IndexDB().Search(ctx, &pbv1.Series{Subject: mqo.Name, EntityValues: mqo.Entity}, mqo.Filter, mqo.Order) + if err != nil { + return nil, err + } + var sids []common.SeriesID + for i := range sl { + sids = append(sids, sl[i].ID) + } + var pws []*partWrapper + var parts []*part + qo := &QueryOptions{ + MeasureQueryOptions: mqo, + minTimestamp: mqo.TimeRange.Start.UnixNano(), + maxTimestamp: mqo.TimeRange.End.UnixNano(), + } + for _, tw := range tabWrappers { + pws, parts = tw.Table().getParts(pws, parts, qo) + } + // TODO: cache tstIter + var tstIter tstIter + originalSids := make([]common.SeriesID, len(sids)) + copy(originalSids, sids) + sort.Slice(sids, func(i, j int) bool { return sids[i] < sids[j] }) + tstIter.init(parts, sids, qo) + if tstIter.err != nil { + return nil, fmt.Errorf("cannot init tstIter: %w", tstIter.err) + } + var result QueryResult + for tstIter.NextBlock() { + bc := generateBlockCursor() + p := tstIter.piHeap[0] + bc.init(p.p, p.bm, qo) + result.data = append(result.data, bc) + } + if mqo.Order != nil { + if mqo.Order.Sort == modelv1.Sort_SORT_ASC || mqo.Order.Sort == modelv1.Sort_SORT_UNSPECIFIED { + result.asc = true + } + sidToIndex := make(map[common.SeriesID]int) + for i, si := range originalSids { + sidToIndex[si] = i + } + sort.Slice(result.data, func(i, j int) bool { + return sidToIndex[result.data[i].bm.seriesID] < sidToIndex[result.data[j].bm.seriesID] + }) + } else { + result.asc = true + } + return &result, nil +} + +type tableIterator struct { + sids []common.SeriesID + tsTables []*tsTable +} + +func (si *tableIterator) reset() { + +} + +// func (si *tableIterator) init(seriesTablesList []*seriesTables) { + +// } + +func (si *tableIterator) nextBlock() bool { + return false +} + +func generateTableIterator() *tableIterator { + v := tiPool.Get() + if v == nil { + return &tableIterator{} + } + return v.(*tableIterator) +} + +func releaseTableIterator(ti *tableIterator) { + ti.reset() + tiPool.Put(ti) +} + +var tiPool sync.Pool + +func (bc *blockCursor) copyAllTo(r *pbv1.Result) { + r.Timestamps = bc.timestamps + for _, cf := range bc.tagFamilies { + tf := pbv1.TagFamily{ + Name: cf.Name, + } + for _, c := range cf.Columns { + t := pbv1.Tag{ + Name: c.Name, + } + for _, v := range c.Values { + t.Values = append(t.Values, mustDecodeTagValue(c.ValueType, v)) + } + tf.Tags = append(tf.Tags, t) + } + r.TagFamilies = append(r.TagFamilies, tf) + } + for _, c := range bc.fields.Columns { + f := pbv1.Field{ + Name: c.Name, + } + for _, v := range c.Values { + f.Values = append(f.Values, mustDecodeFieldValue(c.ValueType, v)) + } + r.Fields = append(r.Fields, f) + } +} + +func (bc *blockCursor) copyTo(r *pbv1.Result) { + r.Timestamps = append(r.Timestamps, bc.timestamps[bc.idx]) + if len(r.TagFamilies) != len(bc.tagFamilies) { + for _, cf := range bc.tagFamilies { + tf := pbv1.TagFamily{ + Name: cf.Name, + } + for _, c := range cf.Columns { + t := pbv1.Tag{ + Name: c.Name, + } + tf.Tags = append(tf.Tags, t) + } + r.TagFamilies = append(r.TagFamilies, tf) + } + } + for i, cf := range bc.tagFamilies { + for i2, c := range cf.Columns { + r.TagFamilies[i].Tags[i2].Values = append(r.TagFamilies[i].Tags[i2].Values, mustDecodeTagValue(c.ValueType, c.Values[bc.idx])) + } + } + + if len(r.Fields) != len(bc.fields.Columns) { + for _, c := range bc.fields.Columns { + f := pbv1.Field{ + Name: c.Name, + } + r.Fields = append(r.Fields, f) + } + } + for i, c := range bc.fields.Columns { + r.Fields[i].Values = append(r.Fields[i].Values, mustDecodeFieldValue(c.ValueType, c.Values[bc.idx])) + } +} + +func mustDecodeTagValue(valueType pbv1.ValueType, value []byte) *modelv1.TagValue { + switch valueType { + case pbv1.ValueTypeInt64: + return &modelv1.TagValue{ + Value: &modelv1.TagValue_Int{ + Int: &modelv1.Int{ + Value: convert.BytesToInt64(value), + }, + }, + } + case pbv1.ValueTypeStr: + return &modelv1.TagValue{ + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{ + Value: string(value), + }, + }, + } + case pbv1.ValueTypeBinaryData: + data := make([]byte, len(value)) + copy(data, value) + return &modelv1.TagValue{ + Value: &modelv1.TagValue_BinaryData{ + BinaryData: data, + }, + } + case pbv1.ValueTypeInt64Arr: + var values []int64 + for i := 0; i < len(value); i += 8 { + values = append(values, convert.BytesToInt64(value[i:i+8])) + } + return &modelv1.TagValue{ + Value: &modelv1.TagValue_IntArray{ + IntArray: &modelv1.IntArray{ + Value: values, + }}} + case pbv1.ValueTypeStrArr: + var values []string + bb := longTermBufPool.Get() + var err error + for len(value) > 0 { + bb.Buf, value, err = unmarshalVarArray(bb.Buf[:0], value) + if err != nil { + logger.Panicf("unmarshalVarArray failed: %v", err) + } + values = append(values, string(bb.Buf)) + } + return &modelv1.TagValue{ + Value: &modelv1.TagValue_StrArray{ + StrArray: &modelv1.StrArray{ + Value: values, + }}} + default: + logger.Panicf("unsupported value type: %v", valueType) + return nil + } +} + +func mustDecodeFieldValue(valueType pbv1.ValueType, value []byte) *modelv1.FieldValue { + switch valueType { + case pbv1.ValueTypeInt64: + return &modelv1.FieldValue{ + Value: &modelv1.FieldValue_Int{ + Int: &modelv1.Int{ + Value: convert.BytesToInt64(value), + }, + }, + } + case pbv1.ValueTypeFloat64: + return &modelv1.FieldValue{ + Value: &modelv1.FieldValue_Float{ + Float: &modelv1.Float{ + Value: convert.BytesToFloat64(value), + }, + }, + } + case pbv1.ValueTypeStr: + return &modelv1.FieldValue{ + Value: &modelv1.FieldValue_Str{ + Str: &modelv1.Str{ + Value: string(value), + }, + }, + } + case pbv1.ValueTypeBinaryData: + data := make([]byte, len(value)) + copy(data, value) + return &modelv1.FieldValue{ + Value: &modelv1.FieldValue_BinaryData{ + BinaryData: data, + }, + } + default: + logger.Panicf("unsupported value type: %v", valueType) + return nil + } +} + func (s *measure) Shards(entity tsdb.Entity) ([]tsdb.Shard, error) { panic("implement me") // wrap := func(shards []tsdb.Shard) []tsdb.Shard { @@ -111,10 +501,6 @@ func (s *measure) CompanionShards(metadata *commonv1.Metadata) ([]tsdb.Shard, er // return wrap(db.Shards()), nil } -func formatMeasureCompanionPrefix(measureName, name string) string { - return measureName + "." + name -} - func (s *measure) Shard(id common.ShardID) (tsdb.Shard, error) { panic("implement me") // shard, err := s.databaseSupplier.SupplyTSDB().Shard(id) diff --git a/banyand/measure/measure_topn.go b/banyand/measure/measure_topn.go index 15780b54..79263a1c 100644 --- a/banyand/measure/measure_topn.go +++ b/banyand/measure/measure_topn.go @@ -39,7 +39,6 @@ import ( measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/queue" - "github.com/apache/skywalking-banyandb/banyand/tsdb" "github.com/apache/skywalking-banyandb/pkg/bus" "github.com/apache/skywalking-banyandb/pkg/flow" "github.com/apache/skywalking-banyandb/pkg/flow/streaming" @@ -72,7 +71,7 @@ var ( type dataPointWithEntityValues struct { *measurev1.DataPointValue - entityValues tsdb.EntityValues + entityValues []*modelv1.TagValue } type topNStreamingProcessor struct { @@ -144,6 +143,8 @@ func (t *topNStreamingProcessor) writeStreamRecord(record flow.StreamRecord) err eventTime := t.downSampleTimeBucket(record.TimestampMillis()) timeBucket := eventTime.Format(timeBucketFormat) var err error + publisher := t.pipeline.NewBatchPublisher() + defer publisher.Close() for group, tuples := range tuplesGroups { if e := t.l.Debug(); e.Enabled() { e.Str("TopN", t.topNSchema.GetMetadata().GetName()). @@ -154,13 +155,13 @@ func (t *topNStreamingProcessor) writeStreamRecord(record flow.StreamRecord) err for rankNum, tuple := range tuples { fieldValue := tuple.V1.(int64) data := tuple.V2.(flow.StreamRecord).Data().(flow.Data) - err = multierr.Append(err, t.writeData(eventTime, timeBucket, fieldValue, group, data, rankNum)) + err = multierr.Append(err, t.writeData(publisher, eventTime, timeBucket, fieldValue, group, data, rankNum)) } } return err } -func (t *topNStreamingProcessor) writeData(eventTime time.Time, timeBucket string, fieldValue int64, +func (t *topNStreamingProcessor) writeData(publisher queue.BatchPublisher, eventTime time.Time, timeBucket string, fieldValue int64, group string, data flow.Data, rankNum int, ) error { var tagValues []*modelv1.TagValue @@ -170,7 +171,7 @@ func (t *topNStreamingProcessor) writeData(eventTime time.Time, timeBucket strin return errors.New("fail to extract tag values from topN result") } } - entity, entityValues, shardID, err := t.locate(tagValues, rankNum) + series, shardID, err := t.locate(tagValues, rankNum) if err != nil { return err } @@ -197,7 +198,7 @@ func (t *topNStreamingProcessor) writeData(eventTime time.Time, timeBucket strin }, }, }, - }, data[0].(tsdb.EntityValues)...), + }, data[0].([]*modelv1.TagValue)...), }, }, Fields: []*modelv1.FieldValue{ @@ -211,14 +212,13 @@ func (t *topNStreamingProcessor) writeData(eventTime time.Time, timeBucket strin }, }, }, - ShardId: uint32(shardID), - SeriesHash: tsdb.HashEntity(entity), + ShardId: uint32(shardID), } if t.l.Debug().Enabled() { - iwr.EntityValues = entityValues.Encode() + iwr.EntityValues = series.EntityValues } - message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), iwr) - _, errWritePub := t.pipeline.Publish(apiData.TopicMeasureWrite, message) + message := bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), "local", iwr) + _, errWritePub := publisher.Publish(apiData.TopicMeasureWrite, message) return errWritePub } @@ -226,34 +226,49 @@ func (t *topNStreamingProcessor) downSampleTimeBucket(eventTimeMillis int64) tim return time.UnixMilli(eventTimeMillis - eventTimeMillis%t.interval.Milliseconds()) } -func (t *topNStreamingProcessor) locate(tagValues []*modelv1.TagValue, rankNum int) (tsdb.Entity, tsdb.EntityValues, common.ShardID, error) { +func (t *topNStreamingProcessor) locate(tagValues []*modelv1.TagValue, rankNum int) (*pbv1.Series, common.ShardID, error) { if len(tagValues) != 0 && len(t.topNSchema.GetGroupByTagNames()) != len(tagValues) { - return nil, nil, 0, errors.New("no enough tag values for the entity") + return nil, 0, errors.New("no enough tag values for the entity") } + // Subject: topN aggregation Name + // // entity prefix - // 1) source measure Name + topN aggregation Name - // 2) sort direction - // 3) rank number - // >4) group tag values if needed - entity := make(tsdb.EntityValues, 1+1+1+len(tagValues)) + // + // 1) sort direction + // 2) rank number + // >3) group tag values if needed + series := &pbv1.Series{ + Subject: t.topNSchema.GetMetadata().GetName(), + EntityValues: make([]*modelv1.TagValue, 1+1+len(tagValues)), + } + // entity prefix - entity[0] = tsdb.StrValue(formatMeasureCompanionPrefix(t.topNSchema.GetSourceMeasure().GetName(), - t.topNSchema.GetMetadata().GetName())) - entity[1] = tsdb.Int64Value(int64(t.sortDirection.Number())) - entity[2] = tsdb.Int64Value(int64(rankNum)) + series.EntityValues[0] = &modelv1.TagValue{ + Value: &modelv1.TagValue_Int{ + Int: &modelv1.Int{ + Value: int64(t.sortDirection), + }, + }, + } + series.EntityValues[1] = &modelv1.TagValue{ + Value: &modelv1.TagValue_Int{ + Int: &modelv1.Int{ + Value: int64(rankNum), + }, + }, + } // measureID as sharding key for idx, tagVal := range tagValues { - entity[idx+3] = tagVal + series.EntityValues[idx+3] = tagVal } - e, err := entity.ToEntity() - if err != nil { - return nil, nil, 0, err + if err := series.Marshal(); err != nil { + return nil, 0, fmt.Errorf("fail to marshal series: %w", err) } - id, err := partition.ShardID(e.Marshal(), t.m.shardNum) + id, err := partition.ShardID(series.Buffer, t.m.shardNum) if err != nil { - return nil, nil, 0, err + return nil, 0, err } - return e, entity, common.ShardID(id), nil + return series, common.ShardID(id), nil } func (t *topNStreamingProcessor) start() *topNStreamingProcessor { diff --git a/banyand/measure/measure_write.go b/banyand/measure/measure_write.go index 94ffbe40..eceaf565 100644 --- a/banyand/measure/measure_write.go +++ b/banyand/measure/measure_write.go @@ -25,13 +25,13 @@ import ( "github.com/apache/skywalking-banyandb/api/common" measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" - "github.com/apache/skywalking-banyandb/banyand/internal/storage" "github.com/apache/skywalking-banyandb/banyand/tsdb" "github.com/apache/skywalking-banyandb/pkg/bus" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/partition" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) @@ -49,10 +49,11 @@ func setUpWriteCallback(l *logger.Logger, schemaRepo *schemaRepo) bus.MessageLis func (w *writeCallback) handle(dst map[string]*dataPointsInGroup, writeEvent *measurev1.InternalWriteRequest) (map[string]*dataPointsInGroup, error) { req := writeEvent.Request - t := req.DataPoint.Timestamp.AsTime().Local() - if err := timestamp.Check(t); err != nil { + tp := req.DataPoint.Timestamp.AsTime().Local() + if err := timestamp.Check(tp); err != nil { return nil, fmt.Errorf("invalid timestamp: %s", err) } + t := timestamp.MToN(tp) ts := uint64(t.UnixNano()) gn := req.Metadata.Group @@ -101,9 +102,9 @@ func (w *writeCallback) handle(dst map[string]*dataPointsInGroup, writeEvent *me if fLen > len(stm.schema.GetTagFamilies()) { return nil, fmt.Errorf("%s has more tag families than expected", req.Metadata) } - series, err := tsdb.Register(shardID, &storage.Series{ + series, err := tsdb.Register(shardID, &pbv1.Series{ Subject: req.Metadata.Name, - EntityValues: writeEvent.EntityValues, + EntityValues: writeEvent.EntityValues[1:], }) if err != nil { return nil, fmt.Errorf("cannot register series: %w", err) @@ -199,7 +200,7 @@ func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) { g := groups[i] for j := range g.tables { dps := g.tables[j] - dps.tsTable.Table().mustAddRows(dps.dataPoints) + dps.tsTable.Table().mustAddDataPoints(dps.dataPoints) dps.tsTable.DecRef() } g.tsdb.IndexDB().Write(g.docs) @@ -215,16 +216,16 @@ func encodeFieldValue(name string, fieldValue *modelv1.FieldValue) *nameValue { nv := &nameValue{name: name} switch fieldValue.GetValue().(type) { case *modelv1.FieldValue_Int: - nv.valueType = storage.ValueTypeInt64 + nv.valueType = pbv1.ValueTypeInt64 nv.value = convert.Int64ToBytes(fieldValue.GetInt().GetValue()) case *modelv1.FieldValue_Float: - nv.valueType = storage.ValueTypeFloat64 + nv.valueType = pbv1.ValueTypeFloat64 nv.value = convert.Float64ToBytes(fieldValue.GetFloat().GetValue()) case *modelv1.FieldValue_Str: - nv.valueType = storage.ValueTypeStr + nv.valueType = pbv1.ValueTypeStr nv.value = []byte(fieldValue.GetStr().GetValue()) case *modelv1.FieldValue_BinaryData: - nv.valueType = storage.ValueTypeBinaryData + nv.valueType = pbv1.ValueTypeBinaryData nv.value = bytes.Clone(fieldValue.GetBinaryData()) } return nv @@ -234,13 +235,13 @@ func encodeTagValue(name string, tagValue *modelv1.TagValue) *nameValue { nv := &nameValue{name: name} switch tagValue.GetValue().(type) { case *modelv1.TagValue_Int: - nv.valueType = storage.ValueTypeInt64 + nv.valueType = pbv1.ValueTypeInt64 nv.value = convert.Int64ToBytes(tagValue.GetInt().GetValue()) case *modelv1.TagValue_Str: - nv.valueType = storage.ValueTypeStr + nv.valueType = pbv1.ValueTypeStr nv.value = []byte(tagValue.GetStr().GetValue()) case *modelv1.TagValue_BinaryData: - nv.valueType = storage.ValueTypeBinaryData + nv.valueType = pbv1.ValueTypeBinaryData nv.value = bytes.Clone(tagValue.GetBinaryData()) case *modelv1.TagValue_IntArray: nv.valueArr = make([][]byte, len(tagValue.GetIntArray().Value)) diff --git a/banyand/measure/part.go b/banyand/measure/part.go index 80085662..dd211449 100644 --- a/banyand/measure/part.go +++ b/banyand/measure/part.go @@ -28,6 +28,41 @@ import ( "github.com/apache/skywalking-banyandb/pkg/fs" ) +type part struct { + partMetadata partMetadata + + primaryBlockMetadata []primaryBlockMetadata + + meta fs.Reader + primary fs.Reader + tagFamilyMetadata map[string]fs.Reader + tagFamilies map[string]fs.Reader + timestamps fs.Reader + fieldValues fs.Reader +} + +func openMemPart(mp *memPart) *part { + var p part + p.partMetadata = mp.partMetadata + + p.primaryBlockMetadata = mustReadPrimaryBlockMetadata(p.primaryBlockMetadata[:0], newReader(&mp.meta)) + + // Open data files + p.meta = &mp.meta + p.primary = &mp.primary + p.timestamps = &mp.timestamps + p.fieldValues = &mp.fieldValues + if mp.tagFamilies != nil { + p.tagFamilies = make(map[string]fs.Reader) + p.tagFamilyMetadata = make(map[string]fs.Reader) + for name, tf := range mp.tagFamilies { + p.tagFamilies[name] = tf + p.tagFamilyMetadata[name] = mp.tagFamilyMetadata[name] + } + } + return &p +} + type memPart struct { partMetadata partMetadata @@ -83,7 +118,7 @@ func (mp *memPart) mustInitFromDataPoints(dps *dataPoints) { sort.Sort(dps) - bsw := getBlockWriter() + bsw := generateBlockWriter() bsw.MustInitForMemPart(mp) var sidPrev common.SeriesID uncompressedBlockSizeBytes := uint64(0) @@ -104,7 +139,7 @@ func (mp *memPart) mustInitFromDataPoints(dps *dataPoints) { } bsw.MustWriteDataPoints(sidPrev, dps.timestamps[indexPrev:], dps.tagFamilies[indexPrev:], dps.fields[indexPrev:]) bsw.Flush(&mp.partMetadata) - putBlockStreamWriter(bsw) + releaseBlockWriter(bsw) } func uncompressedDataPointSizeBytes(index int, dps *dataPoints) uint64 { @@ -141,10 +176,11 @@ type partWrapper struct { ref int32 mp *memPart + p *part } -func newMemPartWrapper(mp *memPart) *partWrapper { - return &partWrapper{mp: mp, ref: 1} +func newMemPartWrapper(mp *memPart, p *part) *partWrapper { + return &partWrapper{mp: mp, p: p, ref: 1} } func (pw *partWrapper) incRef() { diff --git a/banyand/measure/part_iter.go b/banyand/measure/part_iter.go new file mode 100644 index 00000000..be1e7a9a --- /dev/null +++ b/banyand/measure/part_iter.go @@ -0,0 +1,258 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "fmt" + "io" + "os" + "sort" + "strings" + + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/pkg/bytes" + "github.com/apache/skywalking-banyandb/pkg/compress/zstd" + "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/logger" +) + +type partIter struct { + bm *blockMetadata + + p *part + + sids []common.SeriesID + + sidIdx int + + minTimestamp int64 + maxTimestamp int64 + + primaryBlockMetadata []primaryBlockMetadata + + bms []blockMetadata + + compressedPrimaryBuf []byte + indexBuf []byte + + err error +} + +func (ps *partIter) reset() { + ps.bm = nil + ps.p = nil + ps.sids = nil + ps.sidIdx = 0 + ps.primaryBlockMetadata = nil + ps.bms = nil + ps.compressedPrimaryBuf = ps.compressedPrimaryBuf[:0] + ps.indexBuf = ps.indexBuf[:0] + ps.err = nil +} + +var isInTest = func() bool { + return strings.HasSuffix(os.Args[0], ".test") +}() + +func (ps *partIter) init(p *part, sids []common.SeriesID, opt *QueryOptions) { + ps.reset() + ps.p = p + + ps.sids = sids + ps.minTimestamp = opt.minTimestamp + ps.maxTimestamp = opt.maxTimestamp + + ps.primaryBlockMetadata = p.primaryBlockMetadata + + ps.nextSeriesID() +} + +func (ps *partIter) NextBlock() bool { + for { + if ps.err != nil { + return false + } + if len(ps.bms) == 0 { + if !ps.nextBHS() { + return false + } + } + if ps.searchBHS() { + return true + } + } +} + +// Error returns the last error. +func (ps *partIter) Error() error { + if ps.err == io.EOF { + return nil + } + return ps.err +} + +func (ps *partIter) nextSeriesID() bool { + if ps.sidIdx >= len(ps.sids) { + ps.err = io.EOF + return false + } + ps.bm.seriesID = ps.sids[ps.sidIdx] + ps.sidIdx++ + return true +} + +func (ps *partIter) skipSeriesIDsSmallerThan(sid common.SeriesID) bool { + sids := ps.sids[ps.sidIdx:] + ps.sidIdx += sort.Search(len(sids), func(i int) bool { + return sid < sids[i] + }) + if ps.sidIdx >= len(ps.sids) { + ps.sidIdx = len(ps.sids) + ps.err = io.EOF + return false + } + ps.bm.seriesID = ps.sids[ps.sidIdx] + ps.sidIdx++ + return true +} + +func (ps *partIter) nextBHS() bool { + for len(ps.primaryBlockMetadata) > 0 { + if !ps.skipSeriesIDsSmallerThan(ps.primaryBlockMetadata[0].seriesID) { + return false + } + ps.primaryBlockMetadata = skipSmallMetaindexRows(ps.primaryBlockMetadata, ps.bm.seriesID) + + mr := &ps.primaryBlockMetadata[0] + ps.primaryBlockMetadata = ps.primaryBlockMetadata[1:] + if ps.bm.seriesID < mr.seriesID { + logger.Panicf("BUG: invariant violation: ps.BlockRef.bh.TSID cannot be smaller than mr.TSID; got %+v vs %+v", &ps.bm.seriesID, &mr.seriesID) + } + + if mr.maxTimestamp < ps.minTimestamp || mr.minTimestamp > ps.maxTimestamp { + continue + } + + bm, err := ps.readPrimaryBlock(mr) + if err != nil { + ps.err = fmt.Errorf("cannot read index block for part %q at offset %d with size %d: %w", + &ps.p.partMetadata, mr.offset, mr.size, err) + return false + } + ps.bms = bm + return true + } + + // No more metaindex rows to search. + ps.err = io.EOF + return false +} + +func skipSmallMetaindexRows(pbmIndex []primaryBlockMetadata, sid common.SeriesID) []primaryBlockMetadata { + if sid < pbmIndex[0].seriesID { + logger.Panicf("BUG: invariant violation: tsid cannot be smaller than metaindex[0]; got %d vs %d", sid, &pbmIndex[0].seriesID) + } + + if sid == pbmIndex[0].seriesID { + return pbmIndex + } + + n := sort.Search(len(pbmIndex), func(i int) bool { + return sid > pbmIndex[i].seriesID + }) + if n == 0 { + logger.Panicf("BUG: invariant violation: sort.Search returned 0 for tsid > metaindex[0].TSID; tsid=%+v; metaindex[0].TSID=%+v", + sid, &pbmIndex[0].seriesID) + } + return pbmIndex[n-1:] +} + +func (ps *partIter) readPrimaryBlock(mr *primaryBlockMetadata) ([]blockMetadata, error) { + ps.compressedPrimaryBuf = bytes.ResizeOver(ps.compressedPrimaryBuf, int(mr.size)) + fs.MustReadData(ps.p.primary, int64(mr.offset), ps.compressedPrimaryBuf) + + var err error + ps.indexBuf, err = zstd.Decompress(ps.indexBuf[:0], ps.compressedPrimaryBuf) + if err != nil { + return nil, fmt.Errorf("cannot decompress index block: %w", err) + } + //TODO: cache bm + bm := make([]blockMetadata, 0) + bm, err = unmarshalBlockMetadata(bm, ps.indexBuf) + if err != nil { + return nil, fmt.Errorf("cannot unmarshal index block: %w", err) + } + return bm, nil +} + +func (ps *partIter) searchBHS() bool { + bhs := ps.bms + for len(bhs) > 0 { + // Skip block headers with tsids smaller than the given sid. + sid := ps.bm.seriesID + if bhs[0].seriesID < sid { + n := sort.Search(len(bhs), func(i int) bool { + return sid > bhs[i].seriesID + }) + if n == len(bhs) { + // Nothing found. + break + } + bhs = bhs[n:] + } + bh := &bhs[0] + + // Invariant: tsid <= bh.TSID + + if bh.seriesID != sid { + // tsid < bh.TSID: no more blocks with the given tsid. + // Proceed to the next (bigger) tsid. + if !ps.skipSeriesIDsSmallerThan(bh.seriesID) { + return false + } + continue + } + + // Found the block with the given tsid. Verify timestamp range. + // While blocks for the same TSID are sorted by MinTimestamp, + // the may contain overlapped time ranges. + // So use linear search instead of binary search. + if bh.timestamps.max < ps.minTimestamp { + // Skip the block with too small timestamps. + bhs = bhs[1:] + continue + } + if bh.timestamps.min > ps.maxTimestamp { + // Proceed to the next tsid, since the remaining blocks + // for the current tsid contain too big timestamps. + if !ps.nextSeriesID() { + return false + } + continue + } + + // Found the tsid block with the matching timestamp range. + // Read it. + ps.bm = bh + + ps.bms = bhs[1:] + return true + } + ps.bms = nil + return false +} diff --git a/banyand/measure/primary_metadata.go b/banyand/measure/primary_metadata.go index 1cb5cac6..4063caf2 100644 --- a/banyand/measure/primary_metadata.go +++ b/banyand/measure/primary_metadata.go @@ -18,9 +18,13 @@ package measure import ( + "fmt" + "io" + "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/pkg/compress/zstd" "github.com/apache/skywalking-banyandb/pkg/encoding" + "github.com/apache/skywalking-banyandb/pkg/logger" ) type primaryBlockMetadata struct { @@ -52,7 +56,6 @@ func (ph *primaryBlockMetadata) mustWriteBlock(data []byte, sidFirst common.Seri longTermBufPool.Put(bb) } -// marshal appends marshaled ih to dst and returns the result. func (ph *primaryBlockMetadata) marshal(dst []byte) []byte { dst = ph.seriesID.AppendToBytes(dst) dst = encoding.Uint64ToBytes(dst, uint64(ph.minTimestamp)) @@ -61,3 +64,69 @@ func (ph *primaryBlockMetadata) marshal(dst []byte) []byte { dst = encoding.Uint64ToBytes(dst, ph.size) return dst } + +func (ph *primaryBlockMetadata) unmarshal(src []byte) ([]byte, error) { + if len(src) < 40 { + return nil, fmt.Errorf("cannot unmarshal primaryBlockMetadata from %d bytes; expect at least 40 bytes", len(src)) + } + ph.seriesID = common.SeriesID(encoding.BytesToUint64(src)) + src = src[8:] + ph.minTimestamp = int64(encoding.BytesToUint64(src)) + src = src[8:] + ph.maxTimestamp = int64(encoding.BytesToUint64(src)) + src = src[8:] + ph.offset = encoding.BytesToUint64(src) + src = src[8:] + ph.size = encoding.BytesToUint64(src) + return src[8:], nil +} + +func mustReadPrimaryBlockMetadata(dst []primaryBlockMetadata, r *reader) []primaryBlockMetadata { + data, err := io.ReadAll(r) + if err != nil { + logger.Panicf("cannot read primaryBlockMetadata entries from %s: %s", r.Path(), err) + } + + bb := longTermBufPool.Get() + bb.Buf, err = zstd.Decompress(bb.Buf[:0], data) + if err != nil { + logger.Panicf("cannot decompress indexBlockHeader entries from %s: %s", r.Path(), err) + } + dst, err = unmarshalPrimaryBlockMetadata(dst, bb.Buf) + longTermBufPool.Put(bb) + if err != nil { + logger.Panicf("cannot parse indexBlockHeader entries from %s: %s", r.Path(), err) + } + return dst +} + +// unmarshalPrimaryBlockMetadata appends unmarshaled from src indexBlockHeader entries to dst and returns the result. +func unmarshalPrimaryBlockMetadata(dst []primaryBlockMetadata, src []byte) ([]primaryBlockMetadata, error) { + dstOrig := dst + for len(src) > 0 { + if len(dst) < cap(dst) { + dst = dst[:len(dst)+1] + } else { + dst = append(dst, primaryBlockMetadata{}) + } + ih := &dst[len(dst)-1] + tail, err := ih.unmarshal(src) + if err != nil { + return dstOrig, fmt.Errorf("cannot unmarshal indexBlockHeader %d: %w", len(dst)-len(dstOrig), err) + } + src = tail + } + if err := validatePrimaryBlockMetadata(dst[len(dstOrig):]); err != nil { + return dstOrig, err + } + return dst, nil +} + +func validatePrimaryBlockMetadata(ihs []primaryBlockMetadata) error { + for i := 1; i < len(ihs); i++ { + if ihs[i].seriesID < ihs[i-1].seriesID { + return fmt.Errorf("unexpected primaryBlockMetadata with smaller seriesID=%s after bigger seriesID=%s", &ihs[i].seriesID, &ihs[i-1].seriesID) + } + } + return nil +} diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go index 5906742d..35b96d02 100644 --- a/banyand/measure/tstable.go +++ b/banyand/measure/tstable.go @@ -18,6 +18,7 @@ package measure import ( + "container/heap" "fmt" "io" "path" @@ -229,17 +230,171 @@ func (tst *tsTable) Close() error { panic("implement me") } -func (tst *tsTable) mustAddRows(dps *dataPoints) { +func (tst *tsTable) mustAddDataPoints(dps *dataPoints) { if len(dps.seriesIDs) == 0 { return } mp := getMemPart() mp.mustInitFromDataPoints(dps) + p := openMemPart(mp) - pw := newMemPartWrapper(mp) + pw := newMemPartWrapper(mp, p) tst.Lock() defer tst.Unlock() tst.memParts = append(tst.memParts, pw) } + +func (tst *tsTable) getParts(dst []*partWrapper, dstPart []*part, opts *QueryOptions) ([]*partWrapper, []*part) { + tst.RLock() + defer tst.RUnlock() + for _, p := range tst.memParts { + pm := p.mp.partMetadata + if opts.maxTimestamp < pm.MinTimestamp || opts.minTimestamp > pm.MaxTimestamp { + continue + } + p.incRef() + dst = append(dst, p) + dstPart = append(dstPart, p.p) + } + return dst, dstPart +} + +type tstIter struct { + queryOpts *QueryOptions + + parts []*part + + piPool []partIter + piHeap partIterHeap + + err error + + nextBlockNoop bool +} + +func (ti *tstIter) reset() { + for i := range ti.parts { + ti.parts[i] = nil + } + ti.parts = ti.parts[:0] + + for i := range ti.piPool { + ti.piPool[i].reset() + } + ti.piPool = ti.piPool[:0] + + for i := range ti.piHeap { + ti.piHeap[i] = nil + } + ti.piHeap = ti.piHeap[:0] + + ti.err = nil + ti.nextBlockNoop = false +} + +func (ti *tstIter) init(parts []*part, sids []common.SeriesID, queryOpts *QueryOptions) { + ti.reset() + ti.parts = parts + ti.queryOpts = queryOpts + + if n := len(ti.parts) - cap(ti.piPool); n > 0 { + ti.piPool = append(ti.piPool[:cap(ti.piPool)], make([]partIter, n)...) + } + ti.piPool = ti.piPool[:len(ti.parts)] + for i, p := range ti.parts { + ti.piPool[i].init(p, sids, queryOpts) + } + + ti.piHeap = ti.piHeap[:0] + for i := range ti.piPool { + ps := &ti.piPool[i] + if !ps.NextBlock() { + if err := ps.Error(); err != nil { + ti.err = fmt.Errorf("cannot initialize tstable iteration: %w", err) + return + } + continue + } + ti.piHeap = append(ti.piHeap, ps) + } + if len(ti.piHeap) == 0 { + ti.err = io.EOF + return + } + heap.Init(&ti.piHeap) + ti.nextBlockNoop = true +} + +func (ti *tstIter) NextBlock() bool { + if ti.err != nil { + return false + } + if ti.nextBlockNoop { + ti.nextBlockNoop = false + return true + } + + ti.err = ti.nextBlock() + if ti.err != nil { + if ti.err != io.EOF { + ti.err = fmt.Errorf("cannot obtain the next block to search in the partition: %w", ti.err) + } + return false + } + return true +} + +func (ti *tstIter) nextBlock() error { + psMin := ti.piHeap[0] + if psMin.NextBlock() { + heap.Fix(&ti.piHeap, 0) + return nil + } + + if err := psMin.Error(); err != nil { + return err + } + + heap.Pop(&ti.piHeap) + + if len(ti.piHeap) == 0 { + return io.EOF + } + return nil +} + +func (ti *tstIter) Error() error { + if ti.err == io.EOF { + return nil + } + return ti.err +} + +type partIterHeap []*partIter + +func (pih *partIterHeap) Len() int { + return len(*pih) +} + +func (pih *partIterHeap) Less(i, j int) bool { + x := *pih + return x[i].bm.less(x[j].bm) +} + +func (pih *partIterHeap) Swap(i, j int) { + x := *pih + x[i], x[j] = x[j], x[i] +} + +func (pih *partIterHeap) Push(x any) { + *pih = append(*pih, x.(*partIter)) +} + +func (pih *partIterHeap) Pop() any { + a := *pih + v := a[len(a)-1] + *pih = a[:len(a)-1] + return v +} diff --git a/pkg/bytes/buffer.go b/pkg/bytes/buffer.go index 369a9d62..8e6ffa9e 100644 --- a/pkg/bytes/buffer.go +++ b/pkg/bytes/buffer.go @@ -24,7 +24,10 @@ import ( "github.com/apache/skywalking-banyandb/pkg/fs" ) -var _ fs.Writer = (*Buffer)(nil) +var ( + _ fs.Writer = (*Buffer)(nil) + _ fs.Reader = (*Buffer)(nil) +) type Buffer struct { Buf []byte @@ -46,6 +49,11 @@ func (b *Buffer) Write(bb []byte) (int, error) { return len(bb), nil } +// Read implements fs.Reader. +func (b *Buffer) Read(offset int64, buffer []byte) (int, error) { + return copy(buffer, b.Buf[offset:]), nil +} + func (b *Buffer) Reset() { b.Buf = b.Buf[:0] } diff --git a/pkg/bytes/buffer.go b/pkg/bytes/resize.go similarity index 53% copy from pkg/bytes/buffer.go copy to pkg/bytes/resize.go index 369a9d62..24a0a975 100644 --- a/pkg/bytes/buffer.go +++ b/pkg/bytes/resize.go @@ -17,52 +17,25 @@ package bytes -import ( - "fmt" - "sync" +import "math/bits" - "github.com/apache/skywalking-banyandb/pkg/fs" -) - -var _ fs.Writer = (*Buffer)(nil) - -type Buffer struct { - Buf []byte -} - -// Close implements fs.Writer. -func (*Buffer) Close() error { - return nil -} - -// Path implements fs.Writer. -func (b *Buffer) Path() string { - return fmt.Sprintf("mem/%p", b) -} - -// Write implements fs.Writer. -func (b *Buffer) Write(bb []byte) (int, error) { - b.Buf = append(b.Buf, bb...) - return len(bb), nil -} - -func (b *Buffer) Reset() { - b.Buf = b.Buf[:0] -} - -type BufferPool struct { - p sync.Pool +func ResizeOver(b []byte, n int) []byte { + if n <= cap(b) { + return b[:n] + } + nNew := roundToNearestPow2(n) + bNew := make([]byte, nNew) + return bNew[:n] } -func (bp *BufferPool) Get() *Buffer { - bbv := bp.p.Get() - if bbv == nil { - return &Buffer{} +func ResizeExact(b []byte, n int) []byte { + if n <= cap(b) { + return b[:n] } - return bbv.(*Buffer) + return make([]byte, n) } -func (bp *BufferPool) Put(b *Buffer) { - b.Reset() - bp.p.Put(b) +func roundToNearestPow2(n int) int { + pow2 := uint8(bits.Len(uint(n - 1))) + return 1 << pow2 } diff --git a/pkg/fs/file_system.go b/pkg/fs/file_system.go index 2010e26e..08837991 100644 --- a/pkg/fs/file_system.go +++ b/pkg/fs/file_system.go @@ -45,6 +45,15 @@ type Writer interface { Close() error } +type Reader interface { + // Read the entire file using streaming read. + Read(offset int64, buffer []byte) (int, error) + // Returns the absolute path of the file. + Path() string + // Close File. + Close() error +} + type Closer interface { // Returns the absolute path of the file. Path() string @@ -55,10 +64,9 @@ type Closer interface { // File operation interface. type File interface { Writer + Reader // Vector Append mode, which supports appending consecutive buffers to the end of the file. Writev(iov *[][]byte) (int, error) - // Reading a specified location of file. - Read(offset int64, buffer []byte) (int, error) // Reading contiguous regions of a file and dispersing them into discontinuous buffers. Readv(offset int64, iov *[][]byte) (int, error) // Read the entire file using streaming read. @@ -114,6 +122,17 @@ func MustWriteData(w Writer, data []byte) { } } +// MustReadData reads data from r and panics if it cannot read all data. +func MustReadData(r Reader, offset int64, buff []byte) { + n, err := r.Read(offset, buff) + if err != nil { + logger.GetLogger().Panic().Err(err).Str("path", r.Path()).Msg("cannot read data") + } + if n != len(buff) { + logger.GetLogger().Panic().Int("read", n).Int("expected", len(buff)).Str("path", r.Path()).Msg("BUG: reader read wrong number of bytes") + } +} + func MustClose(c Closer) { err := c.Close() if err != nil { diff --git a/pkg/pb/v1/metadata.go b/pkg/pb/v1/metadata.go index 22382972..a0055009 100644 --- a/pkg/pb/v1/metadata.go +++ b/pkg/pb/v1/metadata.go @@ -21,6 +21,8 @@ package v1 import ( databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/pkg/index" + "github.com/apache/skywalking-banyandb/pkg/timestamp" ) // FindTagByName finds TagSpec in several tag families by its name. @@ -70,3 +72,53 @@ func FieldValueTypeConv(fieldValue *modelv1.FieldValue) (tagType databasev1.Fiel } return databasev1.FieldType_FIELD_TYPE_UNSPECIFIED, false } + +type OrderBy struct { + Index *databasev1.IndexRule + Sort modelv1.Sort +} + +// AnyEntry is the `*` for a regular expression. It could match "any" Entry in an Entity. +var anyEntry = &modelv1.TagValue_Null{} +var AnyTagValue = &modelv1.TagValue{Value: anyEntry} + +type Tag struct { + Name string + Values []*modelv1.TagValue +} + +type TagFamily struct { + Name string + Tags []Tag +} + +type Field struct { + Name string + Values []*modelv1.FieldValue +} + +type Result struct { + Timestamps []int64 + TagFamilies []TagFamily + Fields []Field +} + +type TagProjection struct { + Family string + Name string +} + +type MeasureQueryOptions struct { + Name string + TimeRange *timestamp.TimeRange + Entity []*modelv1.TagValue + Filter index.Filter + Order *OrderBy + TagProjection []TagProjection + FieldProjection []string +} + +type MeasureQueryResult interface { + Pull() *Result + Release() +} diff --git a/banyand/internal/storage/series.go b/pkg/pb/v1/series.go similarity index 70% rename from banyand/internal/storage/series.go rename to pkg/pb/v1/series.go index 19bf4797..072e3290 100644 --- a/banyand/internal/storage/series.go +++ b/pkg/pb/v1/series.go @@ -15,10 +15,9 @@ // specific language governing permissions and limitations // under the License. -package storage +package v1 import ( - "bytes" "sort" "sync" @@ -38,7 +37,7 @@ type Series struct { ID common.SeriesID } -func (s *Series) marshal() error { +func (s *Series) Marshal() error { s.Buffer = marshalEntityValue(s.Buffer, convert.StringToBytes(s.Subject)) var err error for _, tv := range s.EntityValues { @@ -50,7 +49,7 @@ func (s *Series) marshal() error { return nil } -func (s *Series) unmarshal(src []byte) error { +func (s *Series) Unmarshal(src []byte) error { var err error s.Buffer = s.Buffer[:0] if s.Buffer, src, err = unmarshalEntityValue(s.Buffer, src); err != nil { @@ -92,60 +91,6 @@ func (sp *SeriesPool) Put(s *Series) { sp.pool.Put(s) } -// AnyEntry is the `*` for a regular expression. It could match "any" Entry in an Entity. -var AnyEntry = &modelv1.TagValue_Null{} - -const ( - entityDelimiter = '|' - escape = '\\' -) - -var anyWildcard = []byte{'*'} - -func marshalEntityValue(dest, src []byte) []byte { - if src == nil { - dest = append(dest, entityDelimiter) - return dest - } - if bytes.IndexByte(src, entityDelimiter) < 0 && bytes.IndexByte(src, escape) < 0 { - dest = append(dest, src...) - dest = append(dest, entityDelimiter) - return dest - } - for _, b := range src { - if b == entityDelimiter || b == escape { - dest = append(dest, escape) - } - dest = append(dest, b) - } - dest = append(dest, entityDelimiter) - return dest -} - -func unmarshalEntityValue(dest, src []byte) ([]byte, []byte, error) { - if len(src) == 0 { - return nil, nil, errors.New("empty entity value") - } - if src[0] == entityDelimiter { - return dest, src[1:], nil - } - for len(src) > 0 { - if src[0] == escape { - if len(src) < 2 { - return nil, nil, errors.New("invalid escape character") - } - src = src[1:] - dest = append(dest, src[0]) - } else if src[0] == entityDelimiter { - return dest, src[1:], nil - } else { - dest = append(dest, src[0]) - } - src = src[1:] - } - return nil, nil, errors.New("invalid entity value") -} - // SeriesList is a collection of Series. type SeriesList []*Series @@ -195,7 +140,7 @@ func (a SeriesList) Merge(other SeriesList) SeriesList { return final } -func (a SeriesList) toList() posting.List { +func (a SeriesList) ToList() posting.List { pl := roaring.NewPostingList() for _, v := range a { pl.Insert(uint64(v.ID)) diff --git a/banyand/internal/storage/series_test.go b/pkg/pb/v1/series_test.go similarity index 97% rename from banyand/internal/storage/series_test.go rename to pkg/pb/v1/series_test.go index f4a11e3c..c44c5a89 100644 --- a/banyand/internal/storage/series_test.go +++ b/pkg/pb/v1/series_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package storage +package v1 import ( "testing" @@ -97,7 +97,7 @@ func TestMarshalAndUnmarshalSeries(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { // Test Series.Marshal - err := tt.src.marshal() + err := tt.src.Marshal() // Add assertions assert.NoError(t, err) @@ -107,7 +107,7 @@ func TestMarshalAndUnmarshalSeries(t *testing.T) { // Test Series.Unmarshal tt.src.reset() - err = tt.src.unmarshal(marshaled) + err = tt.src.Unmarshal(marshaled) // Add assertions assert.NoError(t, err) diff --git a/banyand/internal/storage/value.go b/pkg/pb/v1/value.go similarity index 63% rename from banyand/internal/storage/value.go rename to pkg/pb/v1/value.go index 181f2a3d..d25dde07 100644 --- a/banyand/internal/storage/value.go +++ b/pkg/pb/v1/value.go @@ -15,12 +15,15 @@ // specific language governing permissions and limitations // under the License. -package storage +package v1 import ( + "bytes" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/encoding" + "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/pkg/errors" ) @@ -54,7 +57,7 @@ func MustTagValueToValueType(tag *modelv1.TagValue) ValueType { } func marshalTagValue(dest []byte, tv *modelv1.TagValue) ([]byte, error) { - if tv.Value == AnyEntry { + if tv == AnyTagValue { dest = marshalEntityValue(dest, anyWildcard) return dest, nil } @@ -116,3 +119,88 @@ func unmarshalTagValue(dest []byte, src []byte) ([]byte, []byte, *modelv1.TagVal } return nil, nil, nil, errors.New("unsupported tag value type") } + +const ( + entityDelimiter = '|' + escape = '\\' +) + +var anyWildcard = []byte{'*'} + +func marshalEntityValue(dest, src []byte) []byte { + if src == nil { + dest = append(dest, entityDelimiter) + return dest + } + if bytes.IndexByte(src, entityDelimiter) < 0 && bytes.IndexByte(src, escape) < 0 { + dest = append(dest, src...) + dest = append(dest, entityDelimiter) + return dest + } + for _, b := range src { + if b == entityDelimiter || b == escape { + dest = append(dest, escape) + } + dest = append(dest, b) + } + dest = append(dest, entityDelimiter) + return dest +} + +func unmarshalEntityValue(dest, src []byte) ([]byte, []byte, error) { + if len(src) == 0 { + return nil, nil, errors.New("empty entity value") + } + if src[0] == entityDelimiter { + return dest, src[1:], nil + } + for len(src) > 0 { + if src[0] == escape { + if len(src) < 2 { + return nil, nil, errors.New("invalid escape character") + } + src = src[1:] + dest = append(dest, src[0]) + } else if src[0] == entityDelimiter { + return dest, src[1:], nil + } else { + dest = append(dest, src[0]) + } + src = src[1:] + } + return nil, nil, errors.New("invalid entity value") +} + +func MustCompareTagValue(tv1, tv2 *modelv1.TagValue) int { + if tv1 == nil && tv2 == nil { + return 0 + } + if tv1 == nil { + return -1 + } + if tv2 == nil { + return 1 + } + if tv1 == AnyTagValue { + return 1 + } + if tv2 == AnyTagValue { + return -1 + } + vt1 := MustTagValueToValueType(tv1) + vt2 := MustTagValueToValueType(tv2) + if vt1 != vt2 { + logger.Panicf("inconsistent tag value type: %v vs %v", vt1, vt2) + } + switch vt1 { + case ValueTypeStr: + return bytes.Compare(convert.StringToBytes(tv1.GetStr().Value), convert.StringToBytes(tv2.GetStr().Value)) + case ValueTypeInt64: + return int(tv1.GetInt().Value - tv2.GetInt().Value) + case ValueTypeBinaryData: + return bytes.Compare(tv1.GetBinaryData(), tv2.GetBinaryData()) + default: + logger.Panicf("unsupported tag value type: %v", vt1) + return 0 + } +} diff --git a/banyand/internal/storage/value_test.go b/pkg/pb/v1/value_test.go similarity index 99% rename from banyand/internal/storage/value_test.go rename to pkg/pb/v1/value_test.go index 950cb99f..17f77fed 100644 --- a/banyand/internal/storage/value_test.go +++ b/pkg/pb/v1/value_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package storage +package v1 import ( "testing" diff --git a/pkg/query/executor/interface.go b/pkg/query/executor/interface.go index a206293b..1c5eeb82 100644 --- a/pkg/query/executor/interface.go +++ b/pkg/query/executor/interface.go @@ -27,6 +27,7 @@ import ( streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" "github.com/apache/skywalking-banyandb/banyand/tsdb" "github.com/apache/skywalking-banyandb/pkg/bus" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" ) // ExecutionContext allows retrieving data from tsdb. @@ -64,8 +65,7 @@ type StreamExecutable interface { // MeasureExecutionContext allows retrieving data through the measure module. type MeasureExecutionContext interface { - ExecutionContext - ParseField(name string, item tsdb.Item) (*measurev1.DataPoint_Field, error) + Query(ctx context.Context, opts pbv1.MeasureQueryOptions) (pbv1.MeasureQueryResult, error) } // MeasureExecutionContextKey is the key of measure execution context in context.Context. diff --git a/pkg/query/logical/common.go b/pkg/query/logical/common.go index 9d197df4..6c3eb7e0 100644 --- a/pkg/query/logical/common.go +++ b/pkg/query/logical/common.go @@ -59,7 +59,7 @@ func ProjectItem(ec executor.ExecutionContext, item tsdb.Item, projectionFieldRe if len(refs) == 0 { continue } - familyName := refs[0].Tag.getFamilyName() + familyName := refs[0].Tag.GetFamilyName() parsedTagFamily, err := ec.ParseTagFamily(familyName, item) if err != nil { return nil, errors.WithMessage(err, "parse projection") @@ -173,11 +173,11 @@ func (t *Tag) GetCompoundName() string { return t.familyName + ":" + t.name } -func (t *Tag) getTagName() string { +func (t *Tag) GetTagName() string { return t.name } -func (t *Tag) getFamilyName() string { +func (t *Tag) GetFamilyName() string { return t.familyName } diff --git a/pkg/query/logical/expr.go b/pkg/query/logical/expr.go index b750dcde..ed0bdeda 100644 --- a/pkg/query/logical/expr.go +++ b/pkg/query/logical/expr.go @@ -37,7 +37,7 @@ type TagRef struct { // Equal reports whether f and expr have the same name and data type. func (f *TagRef) Equal(expr Expr) bool { if other, ok := expr.(*TagRef); ok { - return other.Tag.getTagName() == f.Tag.getTagName() && other.Spec.Spec.GetType() == f.Spec.Spec.GetType() + return other.Tag.GetTagName() == f.Tag.GetTagName() && other.Spec.Spec.GetType() == f.Spec.Spec.GetType() } return false } diff --git a/pkg/query/logical/index_filter.go b/pkg/query/logical/index_filter.go index 4ab9ff01..78a210c5 100644 --- a/pkg/query/logical/index_filter.go +++ b/pkg/query/logical/index_filter.go @@ -32,6 +32,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/index/posting" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" ) var errInvalidLogicalExpression = errors.New("invalid logical expression") @@ -45,9 +46,9 @@ type GlobalIndexError struct { func (g GlobalIndexError) Error() string { return g.IndexRule.String() } -// BuildLocalFilter returns a new index.Filter for local indices. +// BuildLocalFilterDeprecated returns a new index.Filter for local indices. // It could parse series Path at the same time. -func BuildLocalFilter(criteria *modelv1.Criteria, schema Schema, entityDict map[string]int, +func BuildLocalFilterDeprecated(criteria *modelv1.Criteria, schema Schema, entityDict map[string]int, entity tsdb.Entity, mandatoryIndexRule bool, ) (index.Filter, []tsdb.Entity, error) { if criteria == nil { @@ -56,7 +57,7 @@ func BuildLocalFilter(criteria *modelv1.Criteria, schema Schema, entityDict map[ switch criteria.GetExp().(type) { case *modelv1.Criteria_Condition: cond := criteria.GetCondition() - expr, parsedEntity, err := parseExprOrEntity(entityDict, entity, cond) + expr, parsedEntity, err := parseExprOrEntityDeprecated(entityDict, entity, cond) if err != nil { return nil, nil, err } @@ -75,7 +76,7 @@ func BuildLocalFilter(criteria *modelv1.Criteria, schema Schema, entityDict map[ return nil, nil, errors.Wrapf(errUnsupportedConditionOp, "gobal index conf:%s", cond) } } - return parseCondition(cond, indexRule, expr, entity) + return parseConditionDeprecated(cond, indexRule, expr, entity) } else if mandatoryIndexRule { return nil, nil, errors.Wrapf(errUnsupportedConditionOp, "mandatory index rule conf:%s", cond) } @@ -86,20 +87,20 @@ func BuildLocalFilter(criteria *modelv1.Criteria, schema Schema, entityDict map[ return nil, nil, errors.WithMessagef(errInvalidLogicalExpression, "both sides(left and right) of [%v] are empty", criteria) } if le.GetLeft() == nil { - return BuildLocalFilter(le.Right, schema, entityDict, entity, mandatoryIndexRule) + return BuildLocalFilterDeprecated(le.Right, schema, entityDict, entity, mandatoryIndexRule) } if le.GetRight() == nil { - return BuildLocalFilter(le.Left, schema, entityDict, entity, mandatoryIndexRule) + return BuildLocalFilterDeprecated(le.Left, schema, entityDict, entity, mandatoryIndexRule) } - left, leftEntities, err := BuildLocalFilter(le.Left, schema, entityDict, entity, mandatoryIndexRule) + left, leftEntities, err := BuildLocalFilterDeprecated(le.Left, schema, entityDict, entity, mandatoryIndexRule) if err != nil { return nil, nil, err } - right, rightEntities, err := BuildLocalFilter(le.Right, schema, entityDict, entity, mandatoryIndexRule) + right, rightEntities, err := BuildLocalFilterDeprecated(le.Right, schema, entityDict, entity, mandatoryIndexRule) if err != nil { return nil, nil, err } - entities := parseEntities(le.Op, entity, leftEntities, rightEntities) + entities := parseEntitiesDeprecated(le.Op, entity, leftEntities, rightEntities) if entities == nil { return nil, nil, nil } @@ -120,7 +121,7 @@ func BuildLocalFilter(criteria *modelv1.Criteria, schema Schema, entityDict map[ return nil, nil, errInvalidCriteriaType } -func parseCondition(cond *modelv1.Condition, indexRule *databasev1.IndexRule, expr LiteralExpr, entity tsdb.Entity) (index.Filter, []tsdb.Entity, error) { +func parseConditionDeprecated(cond *modelv1.Condition, indexRule *databasev1.IndexRule, expr LiteralExpr, entity tsdb.Entity) (index.Filter, []tsdb.Entity, error) { switch cond.Op { case modelv1.Condition_BINARY_OP_GT: return newRange(indexRule, index.RangeOpts{ @@ -178,7 +179,7 @@ func parseCondition(cond *modelv1.Condition, indexRule *databasev1.IndexRule, ex return nil, nil, errors.WithMessagef(errUnsupportedConditionOp, "index filter parses %v", cond) } -func parseExprOrEntity(entityDict map[string]int, entity tsdb.Entity, cond *modelv1.Condition) (LiteralExpr, []tsdb.Entity, error) { +func parseExprOrEntityDeprecated(entityDict map[string]int, entity tsdb.Entity, cond *modelv1.Condition) (LiteralExpr, []tsdb.Entity, error) { entityIdx, ok := entityDict[cond.Name] if ok && cond.Op != modelv1.Condition_BINARY_OP_EQ && cond.Op != modelv1.Condition_BINARY_OP_IN { return nil, nil, errors.WithMessagef(errUnsupportedConditionOp, "tag belongs to the entity only supports EQ or IN operation in condition(%v)", cond) @@ -236,7 +237,7 @@ func parseExprOrEntity(entityDict map[string]int, entity tsdb.Entity, cond *mode return nil, nil, errors.WithMessagef(errUnsupportedConditionValue, "index filter parses %v", cond) } -func parseEntities(op modelv1.LogicalExpression_LogicalOp, input tsdb.Entity, left, right []tsdb.Entity) []tsdb.Entity { +func parseEntitiesDeprecated(op modelv1.LogicalExpression_LogicalOp, input tsdb.Entity, left, right []tsdb.Entity) []tsdb.Entity { count := len(input) result := make(tsdb.Entity, count) anyEntity := func(entities []tsdb.Entity) bool { @@ -304,6 +305,275 @@ func parseEntities(op modelv1.LogicalExpression_LogicalOp, input tsdb.Entity, le return []tsdb.Entity{result} } +func BuildLocalFilter(criteria *modelv1.Criteria, schema Schema, entityDict map[string]int, + entity []*modelv1.TagValue, mandatoryIndexRule bool, +) (index.Filter, [][]*modelv1.TagValue, error) { + if criteria == nil { + return nil, [][]*modelv1.TagValue{entity}, nil + } + switch criteria.GetExp().(type) { + case *modelv1.Criteria_Condition: + cond := criteria.GetCondition() + expr, parsedEntity, err := parseExprOrEntity(entityDict, entity, cond) + if err != nil { + return nil, nil, err + } + if parsedEntity != nil { + return nil, parsedEntity, nil + } + if ok, indexRule := schema.IndexDefined(cond.Name); ok { + if indexRule.Location == databasev1.IndexRule_LOCATION_GLOBAL { + switch cond.Op { + case modelv1.Condition_BINARY_OP_EQ, modelv1.Condition_BINARY_OP_IN: + return nil, nil, GlobalIndexError{ + IndexRule: indexRule, + Expr: expr, + } + default: + return nil, nil, errors.Wrapf(errUnsupportedConditionOp, "gobal index conf:%s", cond) + } + } + return parseCondition(cond, indexRule, expr, entity) + } else if mandatoryIndexRule { + return nil, nil, errors.Wrapf(errUnsupportedConditionOp, "mandatory index rule conf:%s", cond) + } + return eNode, [][]*modelv1.TagValue{entity}, nil + case *modelv1.Criteria_Le: + le := criteria.GetLe() + if le.GetLeft() == nil && le.GetRight() == nil { + return nil, nil, errors.WithMessagef(errInvalidLogicalExpression, "both sides(left and right) of [%v] are empty", criteria) + } + if le.GetLeft() == nil { + return BuildLocalFilter(le.Right, schema, entityDict, entity, mandatoryIndexRule) + } + if le.GetRight() == nil { + return BuildLocalFilter(le.Left, schema, entityDict, entity, mandatoryIndexRule) + } + left, leftEntities, err := BuildLocalFilter(le.Left, schema, entityDict, entity, mandatoryIndexRule) + if err != nil { + return nil, nil, err + } + right, rightEntities, err := BuildLocalFilter(le.Right, schema, entityDict, entity, mandatoryIndexRule) + if err != nil { + return nil, nil, err + } + entities := parseEntities(le.Op, entity, leftEntities, rightEntities) + if entities == nil { + return nil, nil, nil + } + if left == nil && right == nil { + return nil, entities, nil + } + switch le.Op { + case modelv1.LogicalExpression_LOGICAL_OP_AND: + and := newAnd(2) + and.append(left).append(right) + return and, entities, nil + case modelv1.LogicalExpression_LOGICAL_OP_OR: + or := newOr(2) + or.append(left).append(right) + return or, entities, nil + } + } + return nil, nil, errInvalidCriteriaType +} + +func parseCondition(cond *modelv1.Condition, indexRule *databasev1.IndexRule, expr LiteralExpr, entity []*modelv1.TagValue) (index.Filter, [][]*modelv1.TagValue, error) { + switch cond.Op { + case modelv1.Condition_BINARY_OP_GT: + return newRange(indexRule, index.RangeOpts{ + Lower: bytes.Join(expr.Bytes(), nil), + }), [][]*modelv1.TagValue{entity}, nil + case modelv1.Condition_BINARY_OP_GE: + return newRange(indexRule, index.RangeOpts{ + IncludesLower: true, + Lower: bytes.Join(expr.Bytes(), nil), + }), [][]*modelv1.TagValue{entity}, nil + case modelv1.Condition_BINARY_OP_LT: + return newRange(indexRule, index.RangeOpts{ + Upper: bytes.Join(expr.Bytes(), nil), + }), [][]*modelv1.TagValue{entity}, nil + case modelv1.Condition_BINARY_OP_LE: + return newRange(indexRule, index.RangeOpts{ + IncludesUpper: true, + Upper: bytes.Join(expr.Bytes(), nil), + }), [][]*modelv1.TagValue{entity}, nil + case modelv1.Condition_BINARY_OP_EQ: + return newEq(indexRule, expr), [][]*modelv1.TagValue{entity}, nil + case modelv1.Condition_BINARY_OP_MATCH: + return newMatch(indexRule, expr), [][]*modelv1.TagValue{entity}, nil + case modelv1.Condition_BINARY_OP_NE: + return newNot(indexRule, newEq(indexRule, expr)), [][]*modelv1.TagValue{entity}, nil + case modelv1.Condition_BINARY_OP_HAVING: + bb := expr.Bytes() + and := newAnd(len(bb)) + for _, b := range bb { + and.append(newEq(indexRule, newBytesLiteral(b))) + } + return and, [][]*modelv1.TagValue{entity}, nil + case modelv1.Condition_BINARY_OP_NOT_HAVING: + bb := expr.Bytes() + and := newAnd(len(bb)) + for _, b := range bb { + and.append(newEq(indexRule, newBytesLiteral(b))) + } + return newNot(indexRule, and), [][]*modelv1.TagValue{entity}, nil + case modelv1.Condition_BINARY_OP_IN: + bb := expr.Bytes() + or := newOr(len(bb)) + for _, b := range bb { + or.append(newEq(indexRule, newBytesLiteral(b))) + } + return or, [][]*modelv1.TagValue{entity}, nil + case modelv1.Condition_BINARY_OP_NOT_IN: + bb := expr.Bytes() + or := newOr(len(bb)) + for _, b := range bb { + or.append(newEq(indexRule, newBytesLiteral(b))) + } + return newNot(indexRule, or), [][]*modelv1.TagValue{entity}, nil + } + return nil, nil, errors.WithMessagef(errUnsupportedConditionOp, "index filter parses %v", cond) +} + +func parseExprOrEntity(entityDict map[string]int, entity []*modelv1.TagValue, cond *modelv1.Condition) (LiteralExpr, [][]*modelv1.TagValue, error) { + entityIdx, ok := entityDict[cond.Name] + if ok && cond.Op != modelv1.Condition_BINARY_OP_EQ && cond.Op != modelv1.Condition_BINARY_OP_IN { + return nil, nil, errors.WithMessagef(errUnsupportedConditionOp, "tag belongs to the entity only supports EQ or IN operation in condition(%v)", cond) + } + switch v := cond.Value.Value.(type) { + case *modelv1.TagValue_Str: + if ok { + parsedEntity := make([]*modelv1.TagValue, len(entity)) + copy(parsedEntity, entity) + parsedEntity[entityIdx] = cond.Value + return nil, [][]*modelv1.TagValue{parsedEntity}, nil + } + return str(v.Str.GetValue()), nil, nil + case *modelv1.TagValue_StrArray: + if ok && cond.Op == modelv1.Condition_BINARY_OP_IN { + entities := make([][]*modelv1.TagValue, len(v.StrArray.Value)) + for i, va := range v.StrArray.Value { + parsedEntity := make([]*modelv1.TagValue, len(entity)) + copy(parsedEntity, entity) + parsedEntity[entityIdx] = &modelv1.TagValue{ + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{ + Value: va, + }, + }, + } + entities[i] = parsedEntity + } + return nil, entities, nil + } + return &strArrLiteral{ + arr: v.StrArray.GetValue(), + }, nil, nil + case *modelv1.TagValue_Int: + if ok { + parsedEntity := make([]*modelv1.TagValue, len(entity)) + copy(parsedEntity, entity) + parsedEntity[entityIdx] = cond.Value + return nil, [][]*modelv1.TagValue{parsedEntity}, nil + } + return &int64Literal{ + int64: v.Int.GetValue(), + }, nil, nil + case *modelv1.TagValue_IntArray: + if ok && cond.Op == modelv1.Condition_BINARY_OP_IN { + entities := make([][]*modelv1.TagValue, len(v.IntArray.Value)) + for i, va := range v.IntArray.Value { + parsedEntity := make([]*modelv1.TagValue, len(entity)) + copy(parsedEntity, entity) + parsedEntity[entityIdx] = &modelv1.TagValue{ + Value: &modelv1.TagValue_Int{ + Int: &modelv1.Int{ + Value: va, + }, + }, + } + entities[i] = parsedEntity + } + return nil, entities, nil + } + return &int64ArrLiteral{ + arr: v.IntArray.GetValue(), + }, nil, nil + case *modelv1.TagValue_Null: + return nullLiteralExpr, nil, nil + } + return nil, nil, errors.WithMessagef(errUnsupportedConditionValue, "index filter parses %v", cond) +} + +func parseEntities(op modelv1.LogicalExpression_LogicalOp, input []*modelv1.TagValue, left, right [][]*modelv1.TagValue) [][]*modelv1.TagValue { + count := len(input) + result := make([]*modelv1.TagValue, count) + anyEntity := func(entities [][]*modelv1.TagValue) bool { + for _, entity := range entities { + for _, entry := range entity { + if entry == pbv1.AnyTagValue { + return false + } + } + } + return true + } + leftAny := anyEntity(left) + rightAny := anyEntity(right) + + mergedEntities := make([][]*modelv1.TagValue, 0, len(left)+len(right)) + + switch op { + case modelv1.LogicalExpression_LOGICAL_OP_AND: + if leftAny && !rightAny { + return right + } + if !leftAny && rightAny { + return left + } + mergedEntities = append(mergedEntities, left...) + mergedEntities = append(mergedEntities, right...) + for i := 0; i < count; i++ { + entry := pbv1.AnyTagValue + for j := 0; j < len(mergedEntities); j++ { + e := mergedEntities[j][i] + if e == pbv1.AnyTagValue { + continue + } + if entry == pbv1.AnyTagValue { + entry = e + } else if pbv1.MustCompareTagValue(entry, e) != 0 { + return nil + } + } + result[i] = entry + } + case modelv1.LogicalExpression_LOGICAL_OP_OR: + if leftAny { + return left + } + if rightAny { + return right + } + mergedEntities = append(mergedEntities, left...) + mergedEntities = append(mergedEntities, right...) + for i := 0; i < count; i++ { + entry := pbv1.AnyTagValue + for j := 0; j < len(mergedEntities); j++ { + e := mergedEntities[j][i] + if entry == pbv1.AnyTagValue { + entry = e + } else if pbv1.MustCompareTagValue(entry, e) != 0 { + return mergedEntities + } + } + result[i] = entry + } + } + return [][]*modelv1.TagValue{result} +} + type fieldKey struct { *databasev1.IndexRule } diff --git a/pkg/query/logical/measure/measure_plan_indexscan_local.go b/pkg/query/logical/measure/measure_plan_indexscan_local.go index 7d9a1fa8..d7866d80 100644 --- a/pkg/query/logical/measure/measure_plan_indexscan_local.go +++ b/pkg/query/logical/measure/measure_plan_indexscan_local.go @@ -20,19 +20,16 @@ package measure import ( "context" "fmt" - "io" "time" - "go.uber.org/multierr" "google.golang.org/protobuf/types/known/timestamppb" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" - "github.com/apache/skywalking-banyandb/banyand/tsdb" "github.com/apache/skywalking-banyandb/pkg/index" - "github.com/apache/skywalking-banyandb/pkg/iter/sort" "github.com/apache/skywalking-banyandb/pkg/logger" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/query/executor" "github.com/apache/skywalking-banyandb/pkg/query/logical" "github.com/apache/skywalking-banyandb/pkg/timestamp" @@ -51,8 +48,17 @@ type unresolvedIndexScan struct { } func (uis *unresolvedIndexScan) Analyze(s logical.Schema) (logical.Plan, error) { + var projTags []pbv1.TagProjection var projTagsRefs [][]*logical.TagRef if len(uis.projectionTags) > 0 { + for i := range uis.projectionTags { + for _, tag := range uis.projectionTags[i] { + projTags = append(projTags, pbv1.TagProjection{ + Family: tag.GetFamilyName(), + Name: tag.GetTagName(), + }) + } + } var err error projTagsRefs, err = s.CreateTagRef(uis.projectionTags...) if err != nil { @@ -60,8 +66,12 @@ func (uis *unresolvedIndexScan) Analyze(s logical.Schema) (logical.Plan, error) } } + var projField []string var projFieldRefs []*logical.FieldRef if len(uis.projectionFields) > 0 { + for i := range uis.projectionFields { + projField = append(projField, uis.projectionFields[i].Name) + } var err error projFieldRefs, err = s.CreateFieldRef(uis.projectionFields...) if err != nil { @@ -71,11 +81,11 @@ func (uis *unresolvedIndexScan) Analyze(s logical.Schema) (logical.Plan, error) entityList := s.EntityList() entityMap := make(map[string]int) - entity := make([]tsdb.Entry, len(entityList)) + entity := make([]*modelv1.TagValue, len(entityList)) for idx, e := range entityList { entityMap[e] = idx // fill AnyEntry by default - entity[idx] = tsdb.AnyEntry + entity[idx] = pbv1.AnyTagValue } filter, entities, err := logical.BuildLocalFilter(uis.criteria, s, entityMap, entity, true) if err != nil { @@ -85,6 +95,8 @@ func (uis *unresolvedIndexScan) Analyze(s logical.Schema) (logical.Plan, error) return &localIndexScan{ timeRange: timestamp.NewInclusiveTimeRange(uis.startTime, uis.endTime), schema: s, + projectionTags: projTags, + projectionFields: projField, projectionTagsRefs: projTagsRefs, projectionFieldsRefs: projFieldRefs, metadata: uis.metadata, @@ -108,9 +120,11 @@ type localIndexScan struct { metadata *commonv1.Metadata l *logger.Logger timeRange timestamp.TimeRange + projectionTags []pbv1.TagProjection + projectionFields []string projectionTagsRefs [][]*logical.TagRef projectionFieldsRefs []*logical.FieldRef - entities []tsdb.Entity + entities [][]*modelv1.TagValue groupByEntity bool maxDataPointsSize int } @@ -124,72 +138,41 @@ func (i *localIndexScan) Sort(order *logical.OrderBy) { } func (i *localIndexScan) Execute(ctx context.Context) (mit executor.MIterator, err error) { - var orderBy *tsdb.OrderBy + var orderBy *pbv1.OrderBy if i.order.Index != nil { - orderBy = &tsdb.OrderBy{ + orderBy = &pbv1.OrderBy{ Index: i.order.Index, Sort: i.order.Sort, } + } else if i.groupByEntity { + orderBy = &pbv1.OrderBy{ + Sort: i.order.Sort, + } } ec := executor.FromMeasureExecutionContext(ctx) - var seriesList tsdb.SeriesList + var results []pbv1.MeasureQueryResult for _, e := range i.entities { - shards, errInternal := ec.Shards(e) - if errInternal != nil { - return nil, errInternal - } - for _, shard := range shards { - sl, errInternal := shard.Series().Search( - context.WithValue( - ctx, - logger.ContextKey, - i.l, - ), - tsdb.NewPath(e), - i.filter, - orderBy, - ) - if errInternal != nil { - return nil, errInternal - } - seriesList = seriesList.Merge(sl) - } - } - if len(seriesList) == 0 { - return dummyIter, nil - } - var builders []logical.SeekerBuilder - if i.order.Index == nil { - builders = append(builders, func(builder tsdb.SeekerBuilder) { - builder.OrderByTime(i.order.Sort) + result, err := ec.Query(ctx, pbv1.MeasureQueryOptions{ + Name: i.metadata.GetName(), + TimeRange: &i.timeRange, + Entity: e, + Filter: i.filter, + Order: orderBy, + TagProjection: i.projectionTags, + FieldProjection: i.projectionFields, }) - } - // CAVEAT: the order of series list matters when sorting by an index. - iters, closers, err := logical.ExecuteForShard(ctx, i.l, seriesList, i.timeRange, builders...) - if err != nil { - return nil, err - } - if len(closers) > 0 { - defer func(closers []io.Closer) { - for _, c := range closers { - err = multierr.Append(err, c.Close()) - } - }(closers) - } + if err != nil { + return nil, fmt.Errorf("failed to query measure: %w", err) + } - if len(iters) == 0 { - return dummyIter, nil + results = append(results, result) } - transformContext := transformContext{ - ec: ec, - projectionTagsRefs: i.projectionTagsRefs, - projectionFieldsRefs: i.projectionFieldsRefs, - } - if i.groupByEntity { - return newSeriesMIterator(iters, transformContext, i.maxDataPointsSize), nil + if len(results) == 0 { + return dummyIter, nil } - it := logical.NewItemIter(iters, i.order.Sort) - return newIndexScanIterator(it, transformContext, i.maxDataPointsSize), nil + return &resultMIterator{ + results: results, + }, nil } func (i *localIndexScan) String() string { @@ -223,129 +206,59 @@ func indexScan(startTime, endTime time.Time, metadata *commonv1.Metadata, projec } } -var _ executor.MIterator = (*indexScanIterator)(nil) - -type indexScanIterator struct { - inner sort.Iterator[tsdb.Item] - err error - current *measurev1.DataPoint - context transformContext - max int - num int -} - -func newIndexScanIterator(inner sort.Iterator[tsdb.Item], context transformContext, max int) executor.MIterator { - return &indexScanIterator{ - inner: inner, - context: context, - max: max, - } -} - -func (ism *indexScanIterator) Next() bool { - if !ism.inner.Next() || ism.err != nil || ism.num > ism.max { - return false - } - nextItem := ism.inner.Val() - var err error - if ism.current, err = transform(nextItem, ism.context); err != nil { - ism.err = multierr.Append(ism.err, err) - } - ism.num++ - return true -} - -func (ism *indexScanIterator) Current() []*measurev1.DataPoint { - if ism.current == nil { - return nil - } - return []*measurev1.DataPoint{ism.current} -} - -func (ism *indexScanIterator) Close() error { - return multierr.Combine(ism.err, ism.inner.Close()) -} - -var _ executor.MIterator = (*seriesIterator)(nil) - -type seriesIterator struct { - err error - context transformContext - inner []tsdb.Iterator +type resultMIterator struct { + results []pbv1.MeasureQueryResult current []*measurev1.DataPoint index int - num int - max int } -func newSeriesMIterator(inner []tsdb.Iterator, context transformContext, max int) executor.MIterator { - return &seriesIterator{ - inner: inner, - context: context, - index: -1, - max: max, - } -} - -func (ism *seriesIterator) Next() bool { - if ism.err != nil || ism.num > ism.max { - return false - } - ism.index++ - if ism.index >= len(ism.inner) { +func (ei *resultMIterator) Next() bool { + if ei.index >= len(ei.results) { return false } - iter := ism.inner[ism.index] - if ism.current != nil { - ism.current = ism.current[:0] + + r := ei.results[ei.index].Pull() + if r == nil { + ei.index++ + return ei.Next() } - for iter.Next() { - dp, err := transform(iter.Val(), ism.context) - if err != nil { - ism.err = err - return false + for i := range r.Timestamps { + dp := &measurev1.DataPoint{ + Timestamp: timestamppb.New(time.Unix(0, int64(r.Timestamps[i]))), } - ism.current = append(ism.current, dp) - } - ism.num++ - return true -} -func (ism *seriesIterator) Current() []*measurev1.DataPoint { - return ism.current -} - -func (ism *seriesIterator) Close() error { - for _, i := range ism.inner { - ism.err = multierr.Append(ism.err, i.Close()) + for _, tf := range r.TagFamilies { + tagFamily := &modelv1.TagFamily{ + Name: tf.Name, + } + dp.TagFamilies = append(dp.TagFamilies, tagFamily) + for _, t := range tf.Tags { + tagFamily.Tags = append(tagFamily.Tags, &modelv1.Tag{ + Key: t.Name, + Value: t.Values[i], + }) + } + } + for _, f := range r.Fields { + dp.Fields = append(dp.Fields, &measurev1.DataPoint_Field{ + Name: f.Name, + Value: f.Values[i], + }) + } } - return ism.err + + return true } -type transformContext struct { - ec executor.MeasureExecutionContext - projectionTagsRefs [][]*logical.TagRef - projectionFieldsRefs []*logical.FieldRef +func (ei *resultMIterator) Current() []*measurev1.DataPoint { + return ei.current } -func transform(item tsdb.Item, ism transformContext) (*measurev1.DataPoint, error) { - tagFamilies, err := logical.ProjectItem(ism.ec, item, ism.projectionTagsRefs) - if err != nil { - return nil, err +func (ei *resultMIterator) Close() error { + for _, result := range ei.results { + result.Release() } - dpFields := make([]*measurev1.DataPoint_Field, 0) - for _, f := range ism.projectionFieldsRefs { - fieldVal, parserFieldErr := ism.ec.ParseField(f.Field.Name, item) - if parserFieldErr != nil { - return nil, parserFieldErr - } - dpFields = append(dpFields, fieldVal) - } - return &measurev1.DataPoint{ - Fields: dpFields, - TagFamilies: tagFamilies, - Timestamp: timestamppb.New(time.Unix(0, int64(item.Time()))), - }, nil + return nil } var dummyIter = dummyMIterator{} diff --git a/pkg/query/logical/measure/topn_plan_localscan.go b/pkg/query/logical/measure/topn_plan_localscan.go index 702dc57b..e04ba384 100644 --- a/pkg/query/logical/measure/topn_plan_localscan.go +++ b/pkg/query/logical/measure/topn_plan_localscan.go @@ -21,20 +21,14 @@ package measure import ( "context" "fmt" - "io" "time" "github.com/pkg/errors" - "go.uber.org/multierr" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" - measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" - "github.com/apache/skywalking-banyandb/banyand/measure" - "github.com/apache/skywalking-banyandb/banyand/tsdb" - "github.com/apache/skywalking-banyandb/pkg/convert" - "github.com/apache/skywalking-banyandb/pkg/iter/sort" "github.com/apache/skywalking-banyandb/pkg/logger" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/query/executor" "github.com/apache/skywalking-banyandb/pkg/query/logical" "github.com/apache/skywalking-banyandb/pkg/timestamp" @@ -54,7 +48,16 @@ type unresolvedLocalScan struct { func (uls *unresolvedLocalScan) Analyze(s logical.Schema) (logical.Plan, error) { var projTagsRefs [][]*logical.TagRef + var projTags []pbv1.TagProjection if len(uls.projectionTags) > 0 { + for i := range uls.projectionTags { + for _, tag := range uls.projectionTags[i] { + projTags = append(projTags, pbv1.TagProjection{ + Family: tag.GetFamilyName(), + Name: tag.GetTagName(), + }) + } + } var err error projTagsRefs, err = s.CreateTagRef(uls.projectionTags...) if err != nil { @@ -63,7 +66,11 @@ func (uls *unresolvedLocalScan) Analyze(s logical.Schema) (logical.Plan, error) } var projFieldRefs []*logical.FieldRef + var projField []string if len(uls.projectionFields) > 0 { + for i := range uls.projectionFields { + projField = append(projField, uls.projectionFields[i].Name) + } var err error projFieldRefs, err = s.CreateFieldRef(uls.projectionFields...) if err != nil { @@ -81,34 +88,41 @@ func (uls *unresolvedLocalScan) Analyze(s logical.Schema) (logical.Plan, error) schema: s, projectionTagsRefs: projTagsRefs, projectionFieldsRefs: projFieldRefs, + projectionTags: projTags, + projectionFields: projField, metadata: uls.metadata, entity: entity, l: logger.GetLogger("topn", "measure", uls.metadata.Group, uls.metadata.Name, "local-index"), }, nil } -func (uls *unresolvedLocalScan) locateEntity(entityList []string) (tsdb.Entity, error) { +func (uls *unresolvedLocalScan) locateEntity(entityList []string) ([]*modelv1.TagValue, error) { entityMap := make(map[string]int) - entity := make([]tsdb.Entry, 1+1+len(entityList)) + entity := make([]*modelv1.TagValue, 1+1+len(entityList)) // sortDirection - entity[0] = convert.Int64ToBytes(int64(uls.sort.Number())) + entity[0] = &modelv1.TagValue{ + Value: &modelv1.TagValue_Int{ + Int: &modelv1.Int{ + Value: int64(uls.sort), + }, + }, + } // rankNumber - entity[1] = tsdb.AnyEntry + entity[1] = pbv1.AnyTagValue for idx, tagName := range entityList { entityMap[tagName] = idx + 2 // allow to make fuzzy search with partial conditions - entity[idx+2] = tsdb.AnyEntry + entity[idx+2] = pbv1.AnyTagValue } for _, pairQuery := range uls.conditions { if pairQuery.GetOp() != modelv1.Condition_BINARY_OP_EQ { return nil, errors.Errorf("tag belongs to the entity only supports EQ operation in condition(%v)", pairQuery) } if entityIdx, ok := entityMap[pairQuery.GetName()]; ok { - switch v := pairQuery.GetValue().GetValue().(type) { - case *modelv1.TagValue_Str: - entity[entityIdx] = []byte(v.Str.GetValue()) - case *modelv1.TagValue_Int: - entity[entityIdx] = convert.Int64ToBytes(v.Int.GetValue()) + entity[entityIdx] = pairQuery.Value + switch pairQuery.GetValue().GetValue().(type) { + case *modelv1.TagValue_Str, *modelv1.TagValue_Int: + entity[entityIdx] = pairQuery.Value default: return nil, errors.New("unsupported condition tag type for entity") } @@ -143,58 +157,30 @@ type localScan struct { timeRange timestamp.TimeRange projectionTagsRefs [][]*logical.TagRef projectionFieldsRefs []*logical.FieldRef - entity tsdb.Entity + projectionTags []pbv1.TagProjection + projectionFields []string + entity []*modelv1.TagValue sort modelv1.Sort } func (i *localScan) Execute(ctx context.Context) (mit executor.MIterator, err error) { - var seriesList tsdb.SeriesList + ec := executor.FromMeasureExecutionContext(ctx) - shards, err := ec.(measure.Measure).CompanionShards(i.metadata) - if err != nil { - return nil, err - } - for _, shard := range shards { - sl, errInternal := shard.Series().List(context.WithValue( - ctx, - logger.ContextKey, - i.l, - ), tsdb.NewPath(i.entity)) - if errInternal != nil { - return nil, errInternal - } - seriesList = seriesList.Merge(sl) - } - if len(seriesList) == 0 { - return dummyIter, nil - } - var builders []logical.SeekerBuilder - builders = append(builders, func(builder tsdb.SeekerBuilder) { - builder.OrderByTime(i.sort) + result, err := ec.Query(ctx, pbv1.MeasureQueryOptions{ + Name: i.metadata.GetName(), + TimeRange: &i.timeRange, + Entity: i.entity, + Order: &pbv1.OrderBy{Sort: i.sort}, + TagProjection: i.projectionTags, + FieldProjection: i.projectionFields, }) - iters, closers, err := logical.ExecuteForShard(ctx, i.l, seriesList, i.timeRange, builders...) if err != nil { - return nil, err - } - if len(closers) > 0 { - defer func(closers []io.Closer) { - for _, c := range closers { - err = multierr.Append(err, c.Close()) - } - }(closers) - } - - if len(iters) == 0 { - return dummyIter, nil + return nil, fmt.Errorf("failed to query measure: %w", err) } - tc := transformContext{ - ec: ec, - projectionTagsRefs: i.projectionTagsRefs, - projectionFieldsRefs: i.projectionFieldsRefs, - } - it := logical.NewItemIter(iters, i.sort) + return &resultMIterator{ + results: []pbv1.MeasureQueryResult{result}, + }, nil - return newLocalScanIterator(it, tc), nil } func (i *localScan) String() string { @@ -213,42 +199,3 @@ func (i *localScan) Schema() logical.Schema { } return i.schema.ProjTags(i.projectionTagsRefs...).ProjFields(i.projectionFieldsRefs...) } - -type localScanIterator struct { - inner sort.Iterator[tsdb.Item] - err error - current *measurev1.DataPoint - context transformContext - num int -} - -func (lst *localScanIterator) Next() bool { - if !lst.inner.Next() || lst.err != nil { - return false - } - nextItem := lst.inner.Val() - var err error - if lst.current, err = transform(nextItem, lst.context); err != nil { - lst.err = multierr.Append(lst.err, err) - } - lst.num++ - return true -} - -func (lst *localScanIterator) Current() []*measurev1.DataPoint { - if lst.current == nil { - return nil - } - return []*measurev1.DataPoint{lst.current} -} - -func (lst *localScanIterator) Close() error { - return multierr.Combine(lst.err, lst.inner.Close()) -} - -func newLocalScanIterator(inner sort.Iterator[tsdb.Item], context transformContext) executor.MIterator { - return &localScanIterator{ - inner: inner, - context: context, - } -} diff --git a/pkg/query/logical/schema.go b/pkg/query/logical/schema.go index e0a14c07..cc588e93 100644 --- a/pkg/query/logical/schema.go +++ b/pkg/query/logical/schema.go @@ -120,7 +120,7 @@ func (cs *CommonSchema) ProjTags(refs ...[]*TagRef) *CommonSchema { } for projFamilyIdx, refInFamily := range refs { for projIdx, ref := range refInFamily { - newCommonSchema.TagSpecMap[ref.Tag.getTagName()] = &TagSpec{ + newCommonSchema.TagSpecMap[ref.Tag.GetTagName()] = &TagSpec{ TagFamilyIdx: projFamilyIdx, TagIdx: projIdx, Spec: ref.Spec.Spec, @@ -160,7 +160,7 @@ func (cs *CommonSchema) CreateRef(tags ...[]*Tag) ([][]*TagRef, error) { for i, tagInFamily := range tags { var tagRefsInFamily []*TagRef for _, tag := range tagInFamily { - if ts, ok := cs.TagSpecMap[tag.getTagName()]; ok { + if ts, ok := cs.TagSpecMap[tag.GetTagName()]; ok { tagRefsInFamily = append(tagRefsInFamily, &TagRef{tag, ts}) } else { return nil, errors.Wrap(errTagNotDefined, tag.GetCompoundName()) diff --git a/pkg/query/logical/stream/stream_plan_tag_filter.go b/pkg/query/logical/stream/stream_plan_tag_filter.go index f6e2f3f0..89c851df 100644 --- a/pkg/query/logical/stream/stream_plan_tag_filter.go +++ b/pkg/query/logical/stream/stream_plan_tag_filter.go @@ -59,7 +59,7 @@ func (uis *unresolvedTagFilter) Analyze(s logical.Schema) (logical.Plan, error) entity[idx] = tsdb.AnyEntry } var err error - ctx.filter, ctx.entities, err = logical.BuildLocalFilter(uis.criteria, s, entityDict, entity, false) + ctx.filter, ctx.entities, err = logical.BuildLocalFilterDeprecated(uis.criteria, s, entityDict, entity, false) if err != nil { var ge logical.GlobalIndexError if errors.As(err, &ge) {
