This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push: new 2ded4101 Fix slice overflow when querying measure (#394) 2ded4101 is described below commit 2ded410158c95e809d95f34db09f4b82f81fbcf3 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Tue Mar 5 09:45:21 2024 +0800 Fix slice overflow when querying measure (#394) --- .github/workflows/slow-test.yml | 1 + banyand/measure/block.go | 122 ++++++++++-- banyand/measure/introducer.go | 13 +- banyand/measure/part_metadata.go | 15 ++ banyand/measure/query.go | 129 ++++++------ banyand/measure/query_test.go | 219 +++++++++++++++++++-- banyand/measure/tstable.go | 19 ++ banyand/measure/tstable_test.go | 6 + banyand/measure/write.go | 14 +- .../testdata/measures/service_latency_minute.json | 47 +++++ test/cases/init.go | 1 + test/cases/measure/data/input/all_latency.yaml | 26 +++ .../data/testdata/service_latency_minute_data.json | 206 +++++++++++++++++++ test/cases/measure/data/want/all_latency.yaml | 120 +++++++++++ test/cases/measure/measure.go | 1 + 15 files changed, 814 insertions(+), 125 deletions(-) diff --git a/.github/workflows/slow-test.yml b/.github/workflows/slow-test.yml index cc4fb09c..c62496d3 100644 --- a/.github/workflows/slow-test.yml +++ b/.github/workflows/slow-test.yml @@ -30,3 +30,4 @@ jobs: uses: ./.github/workflows/test.yml with: options: --label-filter slow + timeout-minutes: 120 diff --git a/banyand/measure/block.go b/banyand/measure/block.go index 707a1ae4..b23e8c34 100644 --- a/banyand/measure/block.go +++ b/banyand/measure/block.go @@ -22,6 +22,7 @@ import ( "sync" "github.com/apache/skywalking-banyandb/api/common" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/pkg/bytes" "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/fs" @@ -438,7 +439,9 @@ func (bc *blockCursor) init(p *part, bm blockMetadata, queryOpts queryOptions) { bc.fieldProjection = queryOpts.FieldProjection } -func (bc *blockCursor) copyAllTo(r *pbv1.MeasureResult, desc bool) { +func (bc *blockCursor) copyAllTo(r *pbv1.MeasureResult, entityValuesAll map[common.SeriesID]map[string]*modelv1.TagValue, + tagProjection []pbv1.TagProjection, desc bool, +) { var idx, offset int if desc { idx = 0 @@ -450,18 +453,68 @@ func (bc *blockCursor) copyAllTo(r *pbv1.MeasureResult, desc bool) { if offset <= idx { return } + size := offset - idx r.SID = bc.bm.seriesID r.Timestamps = append(r.Timestamps, bc.timestamps[idx:offset]...) - for _, cf := range bc.tagFamilies { + var entityValues map[string]*modelv1.TagValue + if entityValuesAll != nil { + entityValues = entityValuesAll[r.SID] + } +OUTER: + for _, tp := range tagProjection { tf := pbv1.TagFamily{ - Name: cf.name, + Name: tp.Family, } - for _, c := range cf.columns { + var cf *columnFamily + for _, tagName := range tp.Names { t := pbv1.Tag{ - Name: c.name, + Name: tagName, } - for _, v := range c.values[idx:offset] { - t.Values = append(t.Values, mustDecodeTagValue(c.valueType, v)) + if entityValues != nil && entityValues[tagName] != nil { + t.Values = make([]*modelv1.TagValue, size) + for i := 0; i < size; i++ { + t.Values[i] = entityValues[tagName] + } + tf.Tags = append(tf.Tags, t) + continue + } + if cf == nil { + for i := range bc.tagFamilies { + if bc.tagFamilies[i].name == tp.Family { + cf = &bc.tagFamilies[i] + break + } + } + } + if cf == nil { + for _, n := range tp.Names { + t = pbv1.Tag{ + Name: n, + Values: make([]*modelv1.TagValue, size), + } + for i := 0; i < size; i++ { + t.Values[i] = pbv1.NullTagValue + } + tf.Tags = append(tf.Tags, t) + } + r.TagFamilies = append(r.TagFamilies, tf) + continue OUTER + } + var foundTag bool + for i := range cf.columns { + if cf.columns[i].name == tagName { + for _, v := range cf.columns[i].values[idx:offset] { + t.Values = append(t.Values, mustDecodeTagValue(cf.columns[i].valueType, v)) + } + foundTag = true + break + } + } + if !foundTag { + t.Values = make([]*modelv1.TagValue, size) + for i := 0; i < size; i++ { + t.Values[i] = pbv1.NullTagValue + } } tf.Tags = append(tf.Tags, t) } @@ -478,11 +531,17 @@ func (bc *blockCursor) copyAllTo(r *pbv1.MeasureResult, desc bool) { } } -func (bc *blockCursor) copyTo(r *pbv1.MeasureResult) { +func (bc *blockCursor) copyTo(r *pbv1.MeasureResult, entityValuesAll map[common.SeriesID]map[string]*modelv1.TagValue, + tagProjection []pbv1.TagProjection, +) { r.SID = bc.bm.seriesID r.Timestamps = append(r.Timestamps, bc.timestamps[bc.idx]) - if len(r.TagFamilies) != len(bc.tagProjection) { - for _, tp := range bc.tagProjection { + var entityValues map[string]*modelv1.TagValue + if entityValuesAll != nil { + entityValues = entityValuesAll[r.SID] + } + if len(r.TagFamilies) == 0 { + for _, tp := range tagProjection { tf := pbv1.TagFamily{ Name: tp.Family, } @@ -495,19 +554,42 @@ func (bc *blockCursor) copyTo(r *pbv1.MeasureResult) { r.TagFamilies = append(r.TagFamilies, tf) } } - if len(bc.tagFamilies) != len(r.TagFamilies) { - logger.Panicf("unexpected number of tag families: got %d; want %d", len(bc.tagFamilies), len(r.TagFamilies)) - } - for i, cf := range bc.tagFamilies { - if len(r.TagFamilies[i].Tags) != len(cf.columns) { - logger.Panicf("unexpected number of tags: got %d; want %d", len(r.TagFamilies[i].Tags), len(bc.tagProjection[i].Names)) - } - for i2, c := range cf.columns { - r.TagFamilies[i].Tags[i2].Values = append(r.TagFamilies[i].Tags[i2].Values, mustDecodeTagValue(c.valueType, c.values[bc.idx])) + for i := range r.TagFamilies { + tfName := r.TagFamilies[i].Name + var cf *columnFamily + for j := range r.TagFamilies[i].Tags { + tagName := r.TagFamilies[i].Tags[j].Name + if entityValues != nil && entityValues[tagName] != nil { + r.TagFamilies[i].Tags[j].Values = append(r.TagFamilies[i].Tags[j].Values, entityValues[tagName]) + continue + } + if cf == nil { + for i := range bc.tagFamilies { + if bc.tagFamilies[i].name == tfName { + cf = &bc.tagFamilies[i] + break + } + } + } + if cf == nil { + r.TagFamilies[i].Tags[j].Values = append(r.TagFamilies[i].Tags[j].Values, pbv1.NullTagValue) + continue + } + var foundTag bool + for _, c := range cf.columns { + if c.name == tagName { + r.TagFamilies[i].Tags[j].Values = append(r.TagFamilies[i].Tags[j].Values, mustDecodeTagValue(c.valueType, c.values[bc.idx])) + foundTag = true + break + } + } + if !foundTag { + r.TagFamilies[i].Tags[j].Values = append(r.TagFamilies[i].Tags[j].Values, pbv1.NullTagValue) + } } } - if len(r.Fields) != len(bc.fieldProjection) { + if len(r.Fields) == 0 { for _, n := range bc.fieldProjection { f := pbv1.Field{ Name: n, diff --git a/banyand/measure/introducer.go b/banyand/measure/introducer.go index e3bb2e01..e371d246 100644 --- a/banyand/measure/introducer.go +++ b/banyand/measure/introducer.go @@ -146,7 +146,7 @@ func (tst *tsTable) introduceMemPart(nextIntroduction *introduction, epoch uint6 nextSnp := cur.copyAllTo(epoch) nextSnp.parts = append(nextSnp.parts, next) nextSnp.creator = snapshotCreatorMemPart - tst.replaceSnapshot(&nextSnp) + tst.replaceSnapshot(&nextSnp, false) if nextIntroduction.applied != nil { close(nextIntroduction.applied) } @@ -160,8 +160,7 @@ func (tst *tsTable) introduceFlushed(nextIntroduction *flusherIntroduction, epoc defer cur.decRef() nextSnp := cur.merge(epoch, nextIntroduction.flushed) nextSnp.creator = snapshotCreatorFlusher - tst.replaceSnapshot(&nextSnp) - tst.persistSnapshot(&nextSnp) + tst.replaceSnapshot(&nextSnp, true) if nextIntroduction.applied != nil { close(nextIntroduction.applied) } @@ -177,20 +176,22 @@ func (tst *tsTable) introduceMerged(nextIntroduction *mergerIntroduction, epoch nextSnp := cur.remove(epoch, nextIntroduction.merged) nextSnp.parts = append(nextSnp.parts, nextIntroduction.newPart) nextSnp.creator = nextIntroduction.creator - tst.replaceSnapshot(&nextSnp) - tst.persistSnapshot(&nextSnp) + tst.replaceSnapshot(&nextSnp, true) if nextIntroduction.applied != nil { close(nextIntroduction.applied) } } -func (tst *tsTable) replaceSnapshot(next *snapshot) { +func (tst *tsTable) replaceSnapshot(next *snapshot, persisted bool) { tst.Lock() defer tst.Unlock() if tst.snapshot != nil { tst.snapshot.decRef() } tst.snapshot = next + if persisted { + tst.persistSnapshot(next) + } } func (tst *tsTable) currentEpoch() uint64 { diff --git a/banyand/measure/part_metadata.go b/banyand/measure/part_metadata.go index 9b64c612..7faaaf12 100644 --- a/banyand/measure/part_metadata.go +++ b/banyand/measure/part_metadata.go @@ -21,6 +21,8 @@ import ( "encoding/json" "path/filepath" + "github.com/pkg/errors" + "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/logger" ) @@ -45,6 +47,19 @@ func (pm *partMetadata) reset() { pm.ID = 0 } +func validatePartMetadata(fileSystem fs.FileSystem, partPath string) error { + metadataPath := filepath.Join(partPath, metadataFilename) + metadata, err := fileSystem.Read(metadataPath) + if err != nil { + return errors.WithMessage(err, "cannot read metadata.json") + } + var pm partMetadata + if err := json.Unmarshal(metadata, &pm); err != nil { + return errors.WithMessage(err, "cannot parse metadata.json") + } + return nil +} + func (pm *partMetadata) mustReadMetadata(fileSystem fs.FileSystem, partPath string) { pm.reset() diff --git a/banyand/measure/query.go b/banyand/measure/query.go index ece4e1b1..66160ae8 100644 --- a/banyand/measure/query.go +++ b/banyand/measure/query.go @@ -33,7 +33,6 @@ import ( "github.com/apache/skywalking-banyandb/banyand/internal/storage" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/logger" - "github.com/apache/skywalking-banyandb/pkg/partition" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema" ) @@ -80,16 +79,6 @@ func (s *measure) Query(ctx context.Context, mqo pbv1.MeasureQueryOptions) (pbv1 } }() - tagNameIndex := make(map[string]partition.TagLocator) - tagFamilySpecs := s.schema.GetTagFamilies() - for i, tagFamilySpec := range tagFamilySpecs { - for j, tagSpec := range tagFamilySpec.GetTags() { - tagNameIndex[tagSpec.GetName()] = partition.TagLocator{ - FamilyOffset: i, - TagOffset: j, - } - } - } sl, err := tsdb.IndexDB().Search(ctx, &pbv1.Series{Subject: mqo.Name, EntityValues: mqo.Entity}, mqo.Filter, mqo.Order) if err != nil { return nil, err @@ -130,9 +119,25 @@ func (s *measure) Query(ctx context.Context, mqo pbv1.MeasureQueryOptions) (pbv1 if tstIter.Error() != nil { return nil, fmt.Errorf("cannot init tstIter: %w", tstIter.Error()) } + projectedEntityOffsets, tagProjectionOnPart := s.parseTagProjection(qo, &result) + result.tagProjection = qo.TagProjection + qo.TagProjection = tagProjectionOnPart for tstIter.nextBlock() { bc := generateBlockCursor() p := tstIter.piHeap[0] + + seriesID := p.curBlock.seriesID + if result.entityValues != nil && result.entityValues[seriesID] == nil { + for i := range sl { + if sl[i].ID == seriesID { + tag := make(map[string]*modelv1.TagValue) + for name, offset := range projectedEntityOffsets { + tag[name] = sl[i].EntityValues[offset] + } + result.entityValues[seriesID] = tag + } + } + } bc.init(p.p, p.curBlock, qo) result.data = append(result.data, bc) } @@ -140,10 +145,6 @@ func (s *measure) Query(ctx context.Context, mqo pbv1.MeasureQueryOptions) (pbv1 return nil, fmt.Errorf("cannot iterate tstIter: %w", tstIter.Error()) } - result.seriesList = sl - result.tagNameIndex = tagNameIndex - result.schema = s.schema - result.sidToIndex = make(map[common.SeriesID]int) for i, si := range originalSids { result.sidToIndex[si] = i @@ -164,13 +165,32 @@ func (s *measure) Query(ctx context.Context, mqo pbv1.MeasureQueryOptions) (pbv1 return &result, nil } -func mustEncodeTagValue(name string, tagType databasev1.TagType, tagValue *modelv1.TagValue, num int) [][]byte { - values := make([][]byte, num) - nv := encodeTagValue(name, tagType, tagValue) - for i := 0; i < num; i++ { - values[i] = nv.marshal() +func (s *measure) parseTagProjection(qo queryOptions, result *queryResult) (projectedEntityOffsets map[string]int, tagProjectionOnPart []pbv1.TagProjection) { + projectedEntityOffsets = make(map[string]int) + for i := range qo.TagProjection { + var found bool + for j := range qo.TagProjection[i].Names { + for k := range s.schema.GetEntity().GetTagNames() { + if qo.TagProjection[i].Names[j] == s.schema.GetEntity().GetTagNames()[k] { + projectedEntityOffsets[qo.TagProjection[i].Names[j]] = k + if result.entityValues == nil { + result.entityValues = make(map[common.SeriesID]map[string]*modelv1.TagValue) + } + } else { + if !found { + found = true + tagProjectionOnPart = append(tagProjectionOnPart, pbv1.TagProjection{ + Family: qo.TagProjection[i].Family, + }) + } + tagProjectionOnPart[len(tagProjectionOnPart)-1].Names = append( + tagProjectionOnPart[len(tagProjectionOnPart)-1].Names, + qo.TagProjection[i].Names[j]) + } + } + } } - return values + return } func mustDecodeTagValue(valueType pbv1.ValueType, value []byte) *modelv1.TagValue { @@ -340,15 +360,14 @@ func binaryDataFieldValue(value []byte) *modelv1.FieldValue { } type queryResult struct { - sidToIndex map[common.SeriesID]int - tagNameIndex map[string]partition.TagLocator - schema *databasev1.Measure - data []*blockCursor - snapshots []*snapshot - seriesList pbv1.SeriesList - loaded bool - orderByTS bool - ascTS bool + sidToIndex map[common.SeriesID]int + entityValues map[common.SeriesID]map[string]*modelv1.TagValue + tagProjection []pbv1.TagProjection + data []*blockCursor + snapshots []*snapshot + loaded bool + orderByTS bool + ascTS bool } func (qr *queryResult) Pull() *pbv1.MeasureResult { @@ -364,47 +383,9 @@ func (qr *queryResult) Pull() *pbv1.MeasureResult { qr.data = append(qr.data[:i], qr.data[i+1:]...) i-- } - if qr.schema.GetEntity() == nil || len(qr.schema.GetEntity().GetTagNames()) == 0 { + if i < 0 { continue } - sidIndex := qr.sidToIndex[qr.data[i].bm.seriesID] - series := qr.seriesList[sidIndex] - entityMap := make(map[string]int) - tagFamilyMap := make(map[string]int) - for idx, entity := range qr.schema.GetEntity().GetTagNames() { - entityMap[entity] = idx + 1 - } - for idx, tagFamily := range qr.data[i].tagFamilies { - tagFamilyMap[tagFamily.name] = idx + 1 - } - for _, tagFamilyProj := range qr.data[i].tagProjection { - for j, tagProj := range tagFamilyProj.Names { - entityPos := entityMap[tagProj] - tagFamilyPos := tagFamilyMap[tagFamilyProj.Family] - if entityPos == 0 { - continue - } - if tagFamilyPos == 0 { - qr.data[i].tagFamilies[tagFamilyPos-1] = columnFamily{ - name: tagFamilyProj.Family, - columns: make([]column, 0), - } - } - offset := qr.tagNameIndex[tagProj] - tagFamilySpec := qr.schema.GetTagFamilies()[offset.FamilyOffset] - tagSpec := tagFamilySpec.GetTags()[offset.TagOffset] - if tagSpec.IndexedOnly { - continue - } - valueType := pbv1.MustTagValueToValueType(series.EntityValues[entityPos-1]) - qr.data[i].tagFamilies[tagFamilyPos-1].columns = append(qr.data[i].tagFamilies[tagFamilyPos-1].columns[:j], - append([]column{{ - name: tagProj, - values: mustEncodeTagValue(tagProj, tagSpec.GetType(), series.EntityValues[entityPos-1], len(qr.data[i].timestamps)), - valueType: valueType, - }}, qr.data[i].tagFamilies[tagFamilyPos-1].columns[j:]...)...) - } - } if qr.orderByTimestampDesc() { qr.data[i].idx = len(qr.data[i].timestamps) - 1 } @@ -418,11 +399,11 @@ func (qr *queryResult) Pull() *pbv1.MeasureResult { if len(qr.data) == 1 { r := &pbv1.MeasureResult{} bc := qr.data[0] - bc.copyAllTo(r, qr.orderByTimestampDesc()) + bc.copyAllTo(r, qr.entityValues, qr.tagProjection, qr.orderByTimestampDesc()) qr.data = qr.data[:0] return r } - return qr.merge() + return qr.merge(qr.entityValues, qr.tagProjection) } func (qr *queryResult) Release() { @@ -493,7 +474,9 @@ func (qr *queryResult) orderByTimestampDesc() bool { return qr.orderByTS && !qr.ascTS } -func (qr *queryResult) merge() *pbv1.MeasureResult { +func (qr *queryResult) merge(entityValuesAll map[common.SeriesID]map[string]*modelv1.TagValue, + tagProjection []pbv1.TagProjection, +) *pbv1.MeasureResult { step := 1 if qr.orderByTimestampDesc() { step = -1 @@ -515,7 +498,7 @@ func (qr *queryResult) merge() *pbv1.MeasureResult { logger.Panicf("following parts version should be less or equal to the previous one") } } else { - topBC.copyTo(result) + topBC.copyTo(result, entityValuesAll, tagProjection) lastPartVersion = topBC.p.partMetadata.ID } diff --git a/banyand/measure/query_test.go b/banyand/measure/query_test.go index d8c4cbd1..7247057e 100644 --- a/banyand/measure/query_test.go +++ b/banyand/measure/query_test.go @@ -70,6 +70,8 @@ func TestQueryResult(t *testing.T) { {Name: "singleTag", Tags: []pbv1.Tag{ {Name: "strTag", Values: []*modelv1.TagValue{strTagValue("value1")}}, {Name: "intTag", Values: []*modelv1.TagValue{int64TagValue(10)}}, + {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, }}, }, Fields: []pbv1.Field{ @@ -82,16 +84,39 @@ func TestQueryResult(t *testing.T) { SID: 2, Timestamps: []int64{1}, TagFamilies: []pbv1.TagFamily{ + {Name: "arrTag", Tags: []pbv1.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "binaryTag", Tags: []pbv1.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, {Name: "singleTag", Tags: []pbv1.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, {Name: "strTag1", Values: []*modelv1.TagValue{strTagValue("tag1")}}, {Name: "strTag2", Values: []*modelv1.TagValue{strTagValue("tag2")}}, }}, }, Fields: nil, }, { - SID: 3, - Timestamps: []int64{1}, - TagFamilies: nil, + SID: 3, + Timestamps: []int64{1}, + TagFamilies: []pbv1.TagFamily{ + {Name: "arrTag", Tags: []pbv1.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "binaryTag", Tags: []pbv1.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "singleTag", Tags: []pbv1.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + }, Fields: []pbv1.Field{ {Name: "intField", Values: []*modelv1.FieldValue{int64FieldValue(1110)}}, }, @@ -117,6 +142,8 @@ func TestQueryResult(t *testing.T) { {Name: "singleTag", Tags: []pbv1.Tag{ {Name: "strTag", Values: []*modelv1.TagValue{strTagValue("value3")}}, {Name: "intTag", Values: []*modelv1.TagValue{int64TagValue(30)}}, + {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, }}, }, Fields: []pbv1.Field{ @@ -129,16 +156,39 @@ func TestQueryResult(t *testing.T) { SID: 2, Timestamps: []int64{2}, TagFamilies: []pbv1.TagFamily{ + {Name: "arrTag", Tags: []pbv1.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "binaryTag", Tags: []pbv1.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, {Name: "singleTag", Tags: []pbv1.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, {Name: "strTag1", Values: []*modelv1.TagValue{strTagValue("tag3")}}, {Name: "strTag2", Values: []*modelv1.TagValue{strTagValue("tag4")}}, }}, }, Fields: nil, }, { - SID: 3, - Timestamps: []int64{2}, - TagFamilies: nil, + SID: 3, + Timestamps: []int64{2}, + TagFamilies: []pbv1.TagFamily{ + {Name: "arrTag", Tags: []pbv1.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "binaryTag", Tags: []pbv1.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "singleTag", Tags: []pbv1.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + }, Fields: []pbv1.Field{ {Name: "intField", Values: []*modelv1.FieldValue{int64FieldValue(4440)}}, }, @@ -156,6 +206,8 @@ func TestQueryResult(t *testing.T) { {Name: "singleTag", Tags: []pbv1.Tag{ {Name: "strTag", Values: []*modelv1.TagValue{strTagValue("value1")}}, {Name: "intTag", Values: []*modelv1.TagValue{int64TagValue(10)}}, + {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, }}, }, Fields: []pbv1.Field{ @@ -168,16 +220,39 @@ func TestQueryResult(t *testing.T) { SID: 2, Timestamps: []int64{1}, TagFamilies: []pbv1.TagFamily{ + {Name: "arrTag", Tags: []pbv1.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "binaryTag", Tags: []pbv1.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, {Name: "singleTag", Tags: []pbv1.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, {Name: "strTag1", Values: []*modelv1.TagValue{strTagValue("tag1")}}, {Name: "strTag2", Values: []*modelv1.TagValue{strTagValue("tag2")}}, }}, }, Fields: nil, }, { - SID: 3, - Timestamps: []int64{1}, - TagFamilies: nil, + SID: 3, + Timestamps: []int64{1}, + TagFamilies: []pbv1.TagFamily{ + {Name: "arrTag", Tags: []pbv1.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "binaryTag", Tags: []pbv1.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "singleTag", Tags: []pbv1.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + }, Fields: []pbv1.Field{ {Name: "intField", Values: []*modelv1.FieldValue{int64FieldValue(1110)}}, }, @@ -204,6 +279,8 @@ func TestQueryResult(t *testing.T) { {Name: "singleTag", Tags: []pbv1.Tag{ {Name: "strTag", Values: []*modelv1.TagValue{strTagValue("value1")}}, {Name: "intTag", Values: []*modelv1.TagValue{int64TagValue(10)}}, + {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, }}, }, Fields: []pbv1.Field{ @@ -216,16 +293,39 @@ func TestQueryResult(t *testing.T) { SID: 2, Timestamps: []int64{1}, TagFamilies: []pbv1.TagFamily{ + {Name: "arrTag", Tags: []pbv1.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "binaryTag", Tags: []pbv1.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, {Name: "singleTag", Tags: []pbv1.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, {Name: "strTag1", Values: []*modelv1.TagValue{strTagValue("tag1")}}, {Name: "strTag2", Values: []*modelv1.TagValue{strTagValue("tag2")}}, }}, }, Fields: nil, }, { - SID: 3, - Timestamps: []int64{1}, - TagFamilies: nil, + SID: 3, + Timestamps: []int64{1}, + TagFamilies: []pbv1.TagFamily{ + {Name: "arrTag", Tags: []pbv1.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "binaryTag", Tags: []pbv1.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "singleTag", Tags: []pbv1.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + }, Fields: []pbv1.Field{ {Name: "intField", Values: []*modelv1.FieldValue{int64FieldValue(1110)}}, }, @@ -243,6 +343,8 @@ func TestQueryResult(t *testing.T) { {Name: "singleTag", Tags: []pbv1.Tag{ {Name: "strTag", Values: []*modelv1.TagValue{strTagValue("value3")}}, {Name: "intTag", Values: []*modelv1.TagValue{int64TagValue(30)}}, + {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, }}, }, Fields: []pbv1.Field{ @@ -255,16 +357,39 @@ func TestQueryResult(t *testing.T) { SID: 2, Timestamps: []int64{2}, TagFamilies: []pbv1.TagFamily{ + {Name: "arrTag", Tags: []pbv1.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "binaryTag", Tags: []pbv1.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, {Name: "singleTag", Tags: []pbv1.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, {Name: "strTag1", Values: []*modelv1.TagValue{strTagValue("tag3")}}, {Name: "strTag2", Values: []*modelv1.TagValue{strTagValue("tag4")}}, }}, }, Fields: nil, }, { - SID: 3, - Timestamps: []int64{2}, - TagFamilies: nil, + SID: 3, + Timestamps: []int64{2}, + TagFamilies: []pbv1.TagFamily{ + {Name: "arrTag", Tags: []pbv1.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "binaryTag", Tags: []pbv1.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "singleTag", Tags: []pbv1.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + }, Fields: []pbv1.Field{ {Name: "intField", Values: []*modelv1.FieldValue{int64FieldValue(4440)}}, }, @@ -291,6 +416,8 @@ func TestQueryResult(t *testing.T) { {Name: "singleTag", Tags: []pbv1.Tag{ {Name: "strTag", Values: []*modelv1.TagValue{strTagValue("value1")}}, {Name: "intTag", Values: []*modelv1.TagValue{int64TagValue(10)}}, + {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, }}, }, Fields: []pbv1.Field{ @@ -303,16 +430,39 @@ func TestQueryResult(t *testing.T) { SID: 2, Timestamps: []int64{1}, TagFamilies: []pbv1.TagFamily{ + {Name: "arrTag", Tags: []pbv1.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "binaryTag", Tags: []pbv1.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, {Name: "singleTag", Tags: []pbv1.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, {Name: "strTag1", Values: []*modelv1.TagValue{strTagValue("tag1")}}, {Name: "strTag2", Values: []*modelv1.TagValue{strTagValue("tag2")}}, }}, }, Fields: nil, }, { - SID: 3, - Timestamps: []int64{1}, - TagFamilies: nil, + SID: 3, + Timestamps: []int64{1}, + TagFamilies: []pbv1.TagFamily{ + {Name: "arrTag", Tags: []pbv1.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "binaryTag", Tags: []pbv1.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + {Name: "singleTag", Tags: []pbv1.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "intTag", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + }, Fields: []pbv1.Field{ {Name: "intField", Values: []*modelv1.FieldValue{int64FieldValue(1110)}}, }, @@ -329,7 +479,16 @@ func TestQueryResult(t *testing.T) { SID: 2, Timestamps: []int64{1, 2}, TagFamilies: []pbv1.TagFamily{ + {Name: "arrTag", Tags: []pbv1.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}}, + }}, + {Name: "binaryTag", Tags: []pbv1.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}}, + }}, {Name: "singleTag", Tags: []pbv1.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}}, + {Name: "intTag", Values: []*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}}, {Name: "strTag1", Values: []*modelv1.TagValue{strTagValue("tag1"), strTagValue("tag3")}}, {Name: "strTag2", Values: []*modelv1.TagValue{strTagValue("tag2"), strTagValue("tag4")}}, }}, @@ -349,6 +508,8 @@ func TestQueryResult(t *testing.T) { {Name: "singleTag", Tags: []pbv1.Tag{ {Name: "strTag", Values: []*modelv1.TagValue{strTagValue("value1"), strTagValue("value3")}}, {Name: "intTag", Values: []*modelv1.TagValue{int64TagValue(10), int64TagValue(30)}}, + {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}}, + {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}}, }}, }, Fields: []pbv1.Field{ @@ -358,9 +519,23 @@ func TestQueryResult(t *testing.T) { {Name: "binaryField", Values: []*modelv1.FieldValue{binaryDataFieldValue(longText), binaryDataFieldValue(longText)}}, }, }, { - SID: 3, - Timestamps: []int64{1, 2}, - TagFamilies: nil, + SID: 3, + Timestamps: []int64{1, 2}, + TagFamilies: []pbv1.TagFamily{ + {Name: "arrTag", Tags: []pbv1.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}}, + }}, + {Name: "binaryTag", Tags: []pbv1.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}}, + }}, + {Name: "singleTag", Tags: []pbv1.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}}, + {Name: "intTag", Values: []*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}}, + {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}}, + {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}}, + }}, + }, Fields: []pbv1.Field{ {Name: "intField", Values: []*modelv1.FieldValue{int64FieldValue(1110), int64FieldValue(4440)}}, }, @@ -389,6 +564,8 @@ func TestQueryResult(t *testing.T) { ti.init(pp, sids, tt.minTimestamp, tt.maxTimestamp) var result queryResult + // Query all tags + result.tagProjection = allTagProjections for ti.nextBlock() { bc := generateBlockCursor() p := ti.piHeap[0] diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go index 356c937a..329c469b 100644 --- a/banyand/measure/tstable.go +++ b/banyand/measure/tstable.go @@ -72,6 +72,13 @@ func newTSTable(fileSystem fs.FileSystem, rootPath string, p common.Position, needToDelete = append(needToDelete, ee[i].Name()) continue } + err = validatePartMetadata(fileSystem, filepath.Join(rootPath, ee[i].Name())) + if err != nil { + l.Info().Err(err).Msg("cannot validate part metadata. skip and delete it") + needToDelete = append(needToDelete, ee[i].Name()) + continue + } + loadedParts = append(loadedParts, p) continue } @@ -87,6 +94,7 @@ func newTSTable(fileSystem fs.FileSystem, rootPath string, p common.Position, loadedSnapshots = append(loadedSnapshots, snapshot) } for i := range needToDelete { + l.Info().Str("path", filepath.Join(rootPath, needToDelete[i])).Msg("delete invalid directory or file") if err := fileSystem.DeleteFile(filepath.Join(rootPath, needToDelete[i])); err != nil { l.Warn().Err(err).Str("path", filepath.Join(rootPath, needToDelete[i])).Msg("failed to delete part. Please check manually") } @@ -125,6 +133,7 @@ func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts []uint64) { snp := snapshot{ epoch: epoch, } + needToPersist := false for _, id := range loadedParts { var find bool for j := range parts { @@ -136,6 +145,13 @@ func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts []uint64) { if !find { tst.gc.submitParts(id) } + err := validatePartMetadata(tst.fileSystem, partPath(tst.root, id)) + if err != nil { + tst.l.Info().Err(err).Uint64("id", id).Msg("cannot validate part metadata. skip and delete it") + tst.gc.submitParts(id) + needToPersist = true + continue + } p := mustOpenFilePart(id, tst.root, tst.fileSystem) p.partMetadata.ID = id snp.parts = append(snp.parts, newPartWrapper(nil, p)) @@ -150,6 +166,9 @@ func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts []uint64) { } snp.incRef() tst.snapshot = &snp + if needToPersist { + tst.persistSnapshot(&snp) + } } func (tst *tsTable) startLoop(cur uint64) { diff --git a/banyand/measure/tstable_test.go b/banyand/measure/tstable_test.go index 8e955c4d..bbbae3a8 100644 --- a/banyand/measure/tstable_test.go +++ b/banyand/measure/tstable_test.go @@ -394,6 +394,12 @@ var tagProjections = map[int][]pbv1.TagProjection{ }, } +var allTagProjections = []pbv1.TagProjection{ + {Family: "arrTag", Names: []string{"strArrTag", "intArrTag"}}, + {Family: "binaryTag", Names: []string{"binaryTag"}}, + {Family: "singleTag", Names: []string{"strTag", "intTag", "strTag1", "strTag2"}}, +} + var fieldProjections = map[int][]string{ 1: {"strField", "intField", "floatField", "binaryField"}, 3: {"intField"}, diff --git a/banyand/measure/write.go b/banyand/measure/write.go index 1b9b2095..a4075dab 100644 --- a/banyand/measure/write.go +++ b/banyand/measure/write.go @@ -125,11 +125,9 @@ func (w *writeCallback) handle(dst map[string]*dataPointsInGroup, writeEvent *me )) } dpt.dataPoints.fields = append(dpt.dataPoints.fields, field) - tagFamilies := make([]nameValues, len(stm.schema.TagFamilies)) + tagFamilies := make([]nameValues, 0, len(stm.schema.TagFamilies)) tagFamiliesForIndexWrite := make([]nameValues, len(stm.schema.TagFamilies)) - dpt.dataPoints.tagFamilies = append(dpt.dataPoints.tagFamilies, tagFamilies) entityMap := make(map[string]bool) - for _, entity := range stm.GetSchema().GetEntity().GetTagNames() { entityMap[entity] = true } @@ -141,7 +139,9 @@ func (w *writeCallback) handle(dst map[string]*dataPointsInGroup, writeEvent *me tagFamily = req.DataPoint.TagFamilies[i] } tagFamilySpec := stm.GetSchema().GetTagFamilies()[i] - tagFamilies[i].name = tagFamilySpec.Name + tf := nameValues{ + name: tagFamilySpec.Name, + } for j := range tagFamilySpec.Tags { var tagValue *modelv1.TagValue if tagFamily == pbv1.NullTagFamily || len(tagFamily.Tags) <= j { @@ -158,9 +158,13 @@ func (w *writeCallback) handle(dst map[string]*dataPointsInGroup, writeEvent *me if tagFamilySpec.Tags[j].IndexedOnly || entityMap[tagFamilySpec.Tags[j].Name] { continue } - tagFamilies[i].values = append(tagFamilies[i].values, nameValue) + tf.values = append(tf.values, nameValue) + } + if len(tf.values) > 0 { + tagFamilies = append(tagFamilies, tf) } } + dpt.dataPoints.tagFamilies = append(dpt.dataPoints.tagFamilies, tagFamilies) if stm.processorManager != nil { stm.processorManager.onMeasureWrite(&measurev1.InternalWriteRequest{ diff --git a/pkg/test/measure/testdata/measures/service_latency_minute.json b/pkg/test/measure/testdata/measures/service_latency_minute.json new file mode 100644 index 00000000..46ddf59c --- /dev/null +++ b/pkg/test/measure/testdata/measures/service_latency_minute.json @@ -0,0 +1,47 @@ +{ + "metadata": { + "name": "service_latency_minute", + "group": "sw_metric" + }, + "tag_families": [ + { + "name": "default", + "tags": [ + { + "name": "id", + "type": "TAG_TYPE_STRING" + } + ] + }, + { + "name": "storage_only", + "tags": [ + { + "name": "entity_id", + "type": "TAG_TYPE_STRING" + } + ] + } + ], + "fields": [ + { + "name": "total", + "field_type": "FIELD_TYPE_INT", + "encoding_method": "ENCODING_METHOD_GORILLA", + "compression_method": "COMPRESSION_METHOD_ZSTD" + }, + { + "name": "value", + "field_type": "FIELD_TYPE_INT", + "encoding_method": "ENCODING_METHOD_GORILLA", + "compression_method": "COMPRESSION_METHOD_ZSTD" + } + ], + "entity": { + "tag_names": [ + "entity_id" + ] + }, + "interval": "1m", + "updated_at": "2021-04-15T01:30:15.01Z" +} \ No newline at end of file diff --git a/test/cases/init.go b/test/cases/init.go index 72f66315..e8fff89d 100644 --- a/test/cases/init.go +++ b/test/cases/init.go @@ -50,4 +50,5 @@ func Initialize(addr string, now time.Time) { casesmeasuredata.Write(conn, "service_instance_endpoint_cpm_minute", "sw_metric", "service_instance_endpoint_cpm_minute_data.json", now, interval) casesmeasuredata.Write(conn, "service_instance_endpoint_cpm_minute", "sw_metric", "service_instance_endpoint_cpm_minute_data1.json", now.Add(10*time.Second), interval) casesmeasuredata.Write(conn, "service_instance_endpoint_cpm_minute", "sw_metric", "service_instance_endpoint_cpm_minute_data2.json", now.Add(10*time.Minute), interval) + casesmeasuredata.Write(conn, "service_latency_minute", "sw_metric", "service_latency_minute_data.json", now, interval) } diff --git a/test/cases/measure/data/input/all_latency.yaml b/test/cases/measure/data/input/all_latency.yaml new file mode 100644 index 00000000..cd48d603 --- /dev/null +++ b/test/cases/measure/data/input/all_latency.yaml @@ -0,0 +1,26 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +metadata: + name: "service_latency_minute" + group: "sw_metric" +tagProjection: + tagFamilies: + - name: "storage_only" + tags: ["entity_id"] +fieldProjection: + names: ["total", "value"] diff --git a/test/cases/measure/data/testdata/service_latency_minute_data.json b/test/cases/measure/data/testdata/service_latency_minute_data.json new file mode 100644 index 00000000..2040c6af --- /dev/null +++ b/test/cases/measure/data/testdata/service_latency_minute_data.json @@ -0,0 +1,206 @@ +[ + { + "tag_families": [ + { + "tags": [ + { + "str": { + "value": "svc1" + } + } + ] + }, + { + "tags": [ + { + "str": { + "value": "entity_1" + } + } + ] + } + ], + "fields": [ + { + "int": { + "value": 100 + } + }, + { + "int": { + "value": 1 + } + } + ] + }, + { + "tag_families": [ + { + "tags": [ + { + "str": { + "value": "svc1" + } + } + ] + }, + { + "tags": [ + { + "str": { + "value": "entity_2" + } + } + ] + } + ], + "fields": [ + { + "int": { + "value": 100 + } + }, + { + "int": { + "value": 2 + } + } + ] + }, + { + "tag_families": [ + { + "tags": [ + { + "str": { + "value": "svc1" + } + } + ] + }, + { + "tags": [ + { + "str": { + "value": "entity_3" + } + } + ] + } + ], + "fields": [ + { + "int": { + "value": 100 + } + }, + { + "int": { + "value": 3 + } + } + ] + }, + { + "tag_families": [ + { + "tags": [ + { + "str": { + "value": "svc2" + } + } + ] + }, + { + "tags": [ + { + "str": { + "value": "entity_4" + } + } + ] + } + ], + "fields": [ + { + "int": { + "value": 100 + } + }, + { + "int": { + "value": 5 + } + } + ] + }, + { + "tag_families": [ + { + "tags": [ + { + "str": { + "value": "svc2" + } + } + ] + }, + { + "tags": [ + { + "str": { + "value": "entity_5" + } + } + ] + } + ], + "fields": [ + { + "int": { + "value": 50 + } + }, + { + "int": { + "value": 4 + } + } + ] + }, + { + "tag_families": [ + { + "tags": [ + { + "str": { + "value": "svc3" + } + } + ] + }, + { + "tags": [ + { + "str": { + "value": "entity_6" + } + } + ] + } + ], + "fields": [ + { + "int": { + "value": 300 + } + }, + { + "int": { + "value": 6 + } + } + ] + } +] diff --git a/test/cases/measure/data/want/all_latency.yaml b/test/cases/measure/data/want/all_latency.yaml new file mode 100644 index 00000000..99515607 --- /dev/null +++ b/test/cases/measure/data/want/all_latency.yaml @@ -0,0 +1,120 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +dataPoints: +- fields: + - name: total + value: + int: + value: "100" + - name: value + value: + int: + value: "1" + tagFamilies: + - name: storage_only + tags: + - key: entity_id + value: + str: + value: entity_1 + timestamp: "2024-03-03T06:46:00Z" +- fields: + - name: total + value: + int: + value: "100" + - name: value + value: + int: + value: "2" + tagFamilies: + - name: storage_only + tags: + - key: entity_id + value: + str: + value: entity_2 + timestamp: "2024-03-03T06:47:00Z" +- fields: + - name: total + value: + int: + value: "100" + - name: value + value: + int: + value: "3" + tagFamilies: + - name: storage_only + tags: + - key: entity_id + value: + str: + value: entity_3 + timestamp: "2024-03-03T06:48:00Z" +- fields: + - name: total + value: + int: + value: "100" + - name: value + value: + int: + value: "5" + tagFamilies: + - name: storage_only + tags: + - key: entity_id + value: + str: + value: entity_4 + timestamp: "2024-03-03T06:49:00Z" +- fields: + - name: total + value: + int: + value: "50" + - name: value + value: + int: + value: "4" + tagFamilies: + - name: storage_only + tags: + - key: entity_id + value: + str: + value: entity_5 + timestamp: "2024-03-03T06:50:00Z" +- fields: + - name: total + value: + int: + value: "300" + - name: value + value: + int: + value: "6" + tagFamilies: + - name: storage_only + tags: + - key: entity_id + value: + str: + value: entity_6 + timestamp: "2024-03-03T06:51:00Z" diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go index 3cc814ac..e0398efc 100644 --- a/test/cases/measure/measure.go +++ b/test/cases/measure/measure.go @@ -65,4 +65,5 @@ var _ = g.DescribeTable("Scanning Measures", verify, g.Entry("In and not In expressions", helpers.Args{Input: "in", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), g.Entry("float64 value", helpers.Args{Input: "float", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), g.Entry("float64 aggregation:min", helpers.Args{Input: "float_agg_min", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), + g.Entry("all_latency", helpers.Args{Input: "all_latency", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), )