This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch v0.10.x in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit c9961ab39ab8bb9680117772ba05131506b63809 Author: Huang Youliang <[email protected]> AuthorDate: Tue Apr 21 10:06:35 2026 +0800 Support multiple types for the same tag in a single trace or sidx part (#1066) * Support multiple types for the same tag in a single part Co-authored-by: Gao Hongtao <[email protected]> --- api/validate/validate.go | 26 ++++++ banyand/internal/sidx/block.go | 2 + banyand/internal/sidx/interfaces.go | 38 +++++---- banyand/internal/sidx/merge.go | 84 +++++++++++++++++-- banyand/internal/sidx/merge_test.go | 121 ++++++++++++++++++++++++++ banyand/internal/sidx/query_result.go | 26 ++++-- banyand/internal/sidx/scan_query.go | 17 ++-- banyand/internal/sidx/tag.go | 79 +++++++++++++++-- banyand/trace/block.go | 31 +++++-- banyand/trace/merger.go | 84 +++++++++++++++++-- banyand/trace/merger_bench_test.go | 8 +- banyand/trace/merger_test.go | 154 +++++++++++++++++++++++++++++++++- banyand/trace/query.go | 15 ++-- banyand/trace/svc_standalone.go | 1 + banyand/trace/tag.go | 53 ++++++++++++ banyand/trace/tag_test.go | 19 +++++ banyand/trace/trace_suite_test.go | 2 +- 17 files changed, 687 insertions(+), 73 deletions(-) diff --git a/api/validate/validate.go b/api/validate/validate.go index 61e7b5d75..946c99061 100644 --- a/api/validate/validate.go +++ b/api/validate/validate.go @@ -20,11 +20,22 @@ package validate import ( "errors" + "fmt" + "strings" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" ) +const reservedTagSeparator = "#" + +func validateTagName(name string) error { + if strings.Contains(name, reservedTagSeparator) { + return fmt.Errorf("tag name %q must not contain reserved character %q", name, reservedTagSeparator) + } + return nil +} + // Group validates the provided Group object. func Group(group *commonv1.Group) error { if group.Metadata == nil { @@ -188,16 +199,28 @@ func Trace(trace *databasev1.Trace) error { if trace.TraceIdTagName == "" { return errors.New("trace_id_tag_name is empty") } + if err := validateTagName(trace.TraceIdTagName); err != nil { + return err + } if trace.SpanIdTagName == "" { return errors.New("span_id_tag_name is empty") } + if err := validateTagName(trace.SpanIdTagName); err != nil { + return err + } if trace.TimestampTagName == "" { return errors.New("timestamp_tag_name is empty") } + if err := validateTagName(trace.TimestampTagName); err != nil { + return err + } for i := range trace.Tags { if trace.Tags[i].Name == "" { return errors.New("trace tag name is empty") } + if err := validateTagName(trace.Tags[i].Name); err != nil { + return err + } if trace.Tags[i].Type == databasev1.TagType_TAG_TYPE_UNSPECIFIED { return errors.New("trace tag type is unspecified") } @@ -236,6 +259,9 @@ func tagFamily(tagFamilies []*databasev1.TagFamilySpec) error { if tagFamilies[i].Tags[j].Name == "" { return errors.New("tag name is empty") } + if err := validateTagName(tagFamilies[i].Tags[j].Name); err != nil { + return err + } if tagFamilies[i].Tags[j].Type == databasev1.TagType_TAG_TYPE_UNSPECIFIED { return errors.New("tag type is unspecified") } diff --git a/banyand/internal/sidx/block.go b/banyand/internal/sidx/block.go index c240dbbfc..3522b474b 100644 --- a/banyand/internal/sidx/block.go +++ b/banyand/internal/sidx/block.go @@ -482,6 +482,8 @@ func fastTagAppend(bi, b *blockPointer, offset int) error { if _, exists := b.tags[t.name]; !exists { return fmt.Errorf("unexpected tag name for tag %q", t.name) } + } + for _, t := range bi.tags { assertIdxAndOffset(t.name, len(b.tags[t.name].values), b.idx, offset) bi.tags[t.name].values = append(bi.tags[t.name].values, b.tags[t.name].values[b.idx:offset]...) } diff --git a/banyand/internal/sidx/interfaces.go b/banyand/internal/sidx/interfaces.go index 058c8626f..215328507 100644 --- a/banyand/internal/sidx/interfaces.go +++ b/banyand/internal/sidx/interfaces.go @@ -105,16 +105,17 @@ type WriteRequest struct { // QueryRequest specifies parameters for a query operation, following StreamQueryOptions pattern. type QueryRequest struct { - Filter index.Filter - TagFilter model.TagFilterMatcher - Order *index.OrderBy - MinKey *int64 - MaxKey *int64 - MinTimestamp *int64 - MaxTimestamp *int64 - SeriesIDs []common.SeriesID - TagProjection []model.TagProjection - MaxBatchSize int + Filter index.Filter + TagFilter model.TagFilterMatcher + Order *index.OrderBy + MinKey *int64 + MaxKey *int64 + SchemaTagTypes map[string]pbv1.ValueType + MinTimestamp *int64 + MaxTimestamp *int64 + SeriesIDs []common.SeriesID + TagProjection []model.TagProjection + MaxBatchSize int } // ScanProgressFunc is a callback for reporting scan progress. @@ -127,14 +128,15 @@ type ScanProgressFunc func(currentPart, totalParts int, rowsFound int) // //nolint:govet // struct layout optimized for readability; 64 bytes is acceptable type ScanQueryRequest struct { - TagFilter model.TagFilterMatcher - OnProgress ScanProgressFunc - MinKey *int64 - MaxKey *int64 - MinTimestamp *int64 - MaxTimestamp *int64 - TagProjection []model.TagProjection - MaxBatchSize int + TagFilter model.TagFilterMatcher + SchemaTagTypes map[string]pbv1.ValueType + OnProgress ScanProgressFunc + MinKey *int64 + MaxKey *int64 + MinTimestamp *int64 + MaxTimestamp *int64 + TagProjection []model.TagProjection + MaxBatchSize int } // QueryResponse contains a batch of query results and execution metadata. diff --git a/banyand/internal/sidx/merge.go b/banyand/internal/sidx/merge.go index ac901a2b5..83c18f3a4 100644 --- a/banyand/internal/sidx/merge.go +++ b/banyand/internal/sidx/merge.go @@ -19,9 +19,11 @@ package sidx import ( "fmt" + "io" "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/fs" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" ) var ( @@ -80,6 +82,7 @@ func (s *sidx) mergeParts(fileSystem fs.FileSystem, closeCh <-chan struct{}, par return nil, errNoPartToMerge } dstPath := partPath(root, partID) + conflictTags := collectConflictTags(parts) var totalSize int64 pii := make([]*partMergeIter, 0, len(parts)) for i := range parts { @@ -94,7 +97,7 @@ func (s *sidx) mergeParts(fileSystem fs.FileSystem, closeCh <-chan struct{}, par bw := generateBlockWriter() bw.mustInitForFilePart(fileSystem, dstPath, shouldCache) - pm, err := mergeBlocks(closeCh, bw, br) + pm, err := mergeBlocks(closeCh, bw, br, conflictTags) releaseBlockWriter(bw) releaseBlockReader(br) for i := range pii { @@ -133,7 +136,7 @@ func (s *sidx) mergeParts(fileSystem fs.FileSystem, closeCh <-chan struct{}, par return newPartWrapper(nil, p), nil } -func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter, br *blockReader) (*partMetadata, error) { +func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter, br *blockReader, conflictTags map[string]struct{}) (*partMetadata, error) { pendingBlockIsEmpty := true pendingBlock := generateBlockPointer() defer releaseBlockPointer(pendingBlock) @@ -151,6 +154,10 @@ func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter, br *blockReader) (*pa decoder = nil } } + loadAndRename := func() { + br.loadBlockData(getDecoder()) + renameConflictTags(&br.block.block, conflictTags) + } for br.nextBlockMetadata() { select { case <-closeCh: @@ -160,7 +167,7 @@ func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter, br *blockReader) (*pa b := br.block if pendingBlockIsEmpty { - br.loadBlockData(getDecoder()) + loadAndRename() pendingBlock.copyFrom(b) pendingBlockIsEmpty = false continue @@ -171,7 +178,7 @@ func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter, br *blockReader) (*pa pendingBlock.block.uncompressedSizeBytes() >= maxUncompressedBlockSize { bw.mustWriteBlock(pendingBlock.bm.seriesID, &pendingBlock.block) releaseDecoder() - br.loadBlockData(getDecoder()) + loadAndRename() pendingBlock.copyFrom(b) continue } @@ -182,7 +189,7 @@ func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter, br *blockReader) (*pa } tmpBlock.reset() tmpBlock.bm.seriesID = b.bm.seriesID - br.loadBlockData(getDecoder()) + loadAndRename() mergeTwoBlocks(tmpBlock, pendingBlock, b) if len(tmpBlock.userKeys) <= maxBlockLength && tmpBlock.block.uncompressedSizeBytes() <= maxUncompressedBlockSize { if len(tmpBlock.userKeys) == 0 { @@ -209,6 +216,73 @@ func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter, br *blockReader) (*pa return &result, nil } +func collectConflictTags(parts []*partWrapper) map[string]struct{} { + tagTypes := make(map[string]map[pbv1.ValueType]struct{}) + for _, pw := range parts { + p := pw.p + for tt := range p.tagMetadata { + t := tt + var vt pbv1.ValueType + if hasTypeSuffix(tt) { + t = decodeTypedTag(tt) + vt = valueType(tt) + } else { + var readErr error + vt, readErr = readFirstTagValueType(p.tagMetadata[tt]) + if readErr != nil { + continue + } + } + if tagTypes[t] == nil { + tagTypes[t] = make(map[pbv1.ValueType]struct{}) + } + tagTypes[t][vt] = struct{}{} + } + } + var result map[string]struct{} + for tag, types := range tagTypes { + if len(types) > 1 { + if result == nil { + result = make(map[string]struct{}) + } + result[tag] = struct{}{} + } + } + return result +} + +func readFirstTagValueType(tmReader fs.Reader) (pbv1.ValueType, error) { + sr := tmReader.SequentialRead() + defer fs.MustClose(sr) + buf := make([]byte, 512) + n, readErr := io.ReadFull(sr, buf) + if readErr != nil && n == 0 { + return pbv1.ValueTypeUnknown, fmt.Errorf("cannot read tag metadata: %w", readErr) + } + tm := generateTagMetadata() + defer releaseTagMetadata(tm) + if unmarshalErr := tm.unmarshal(buf[:n]); unmarshalErr != nil { + return pbv1.ValueTypeUnknown, fmt.Errorf("cannot unmarshal tag metadata: %w", unmarshalErr) + } + return tm.valueType, nil +} + +func renameConflictTags(b *block, conflictTags map[string]struct{}) { + if len(conflictTags) == 0 { + return + } + for t := range conflictTags { + td, exists := b.tags[t] + if !exists { + continue + } + typedName := encodeTypedTag(t, td.valueType) + td.name = typedName + b.tags[typedName] = td + delete(b.tags, t) + } +} + func mergeTwoBlocks(target, left, right *blockPointer) { appendIfEmpty := func(ib1, ib2 *blockPointer) bool { if ib1.idx >= len(ib1.userKeys) { diff --git a/banyand/internal/sidx/merge_test.go b/banyand/internal/sidx/merge_test.go index b62f081c0..64c126aa9 100644 --- a/banyand/internal/sidx/merge_test.go +++ b/banyand/internal/sidx/merge_test.go @@ -29,6 +29,7 @@ import ( "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/banyand/protector" + "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/fs" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/test" @@ -482,6 +483,126 @@ func Test_mergeParts(t *testing.T) { } } +func Test_mergePartsWithConflictTags(t *testing.T) { + esConflictStr := func() *elements { + es := generateElements() + es.mustAppend(1, 100, make([]byte, 10), []Tag{ + {Name: "status", Value: []byte("ok"), ValueType: pbv1.ValueTypeStr}, + }) + es.mustAppend(2, 200, make([]byte, 10), []Tag{ + {Name: "status", Value: []byte("error"), ValueType: pbv1.ValueTypeStr}, + }) + es.mustAppend(3, 300, make([]byte, 10), []Tag{ + {Name: "status", Value: []byte("warn"), ValueType: pbv1.ValueTypeStr}, + }) + return es + }() + + esConflictInt := func() *elements { + es := generateElements() + es.mustAppend(1, 150, make([]byte, 10), []Tag{ + {Name: "status", Value: convert.Int64ToBytes(200), ValueType: pbv1.ValueTypeInt64}, + }) + es.mustAppend(2, 250, make([]byte, 10), []Tag{ + {Name: "status", Value: convert.Int64ToBytes(500), ValueType: pbv1.ValueTypeInt64}, + }) + es.mustAppend(3, 350, make([]byte, 10), []Tag{ + {Name: "status", Value: convert.Int64ToBytes(404), ValueType: pbv1.ValueTypeInt64}, + }) + return es + }() + + verify := func(t *testing.T, pp []*partWrapper, fileSystem fs.FileSystem, root string, partID uint64) { + closeCh := make(chan struct{}) + defer close(closeCh) + s := &sidx{pm: protector.Nop{}} + p, mergeErr := s.mergeParts(fileSystem, closeCh, pp, partID, root) + require.NoError(t, mergeErr) + defer func() { + if p != nil { + p.release() + } + }() + + pmi := &partMergeIter{} + pmi.mustInitFromPart(p.p) + reader := &blockReader{} + reader.init([]*partMergeIter{pmi}) + decoder := generateTagValuesDecoder() + defer releaseTagValuesDecoder(decoder) + + var blockCount int + for reader.nextBlockMetadata() { + reader.loadBlockData(decoder) + b := &reader.block.block + blockCount++ + + _, hasBareStatus := b.tags["status"] + require.False(t, hasBareStatus, + "block %d (seriesID=%d) has bare 'status' tag; expected renamed tags only", + blockCount, reader.block.bm.seriesID) + + hasStr := false + hasInt := false + for tagName := range b.tags { + if tagName == "status#str" { + hasStr = true + } + if tagName == "status#int" { + hasInt = true + } + } + require.True(t, hasStr || hasInt, + "block %d (seriesID=%d) has neither 'status#str' nor 'status#int'", + blockCount, reader.block.bm.seriesID) + } + require.NoError(t, reader.error()) + require.Equal(t, 3, blockCount, "expected 3 blocks (one per seriesID)") + } + + esList := []*elements{esConflictStr, esConflictInt} + + t.Run("memory parts", func(t *testing.T) { + var pp []*partWrapper + tmpPath, defFn := test.Space(require.New(t)) + defer func() { + for _, pw := range pp { + pw.release() + } + defFn() + }() + for _, es := range esList { + mp := GenerateMemPart() + mp.mustInitFromElements(es) + pp = append(pp, newPartWrapper(mp, openMemPart(mp))) + } + verify(t, pp, fs.NewLocalFileSystem(), tmpPath, 1) + }) + + t.Run("file parts", func(t *testing.T) { + var fpp []*partWrapper + tmpPath, defFn := test.Space(require.New(t)) + defer func() { + for _, pw := range fpp { + pw.release() + } + defFn() + }() + fileSystem := fs.NewLocalFileSystem() + for i, es := range esList { + mp := GenerateMemPart() + mp.mustInitFromElements(es) + partPath := filepath.Join(tmpPath, "part_"+string(rune('0'+i))) + mp.mustFlush(fileSystem, partPath) + filePart := mustOpenPart(uint64(i), partPath, fileSystem) + filePW := newPartWrapper(nil, filePart) + fpp = append(fpp, filePW) + ReleaseMemPart(mp) + } + verify(t, fpp, fileSystem, tmpPath, uint64(len(esList))) + }) +} + func elementsToBlocks(esList []*elements) []block { merged := generateElements() defer releaseElements(merged) diff --git a/banyand/internal/sidx/query_result.go b/banyand/internal/sidx/query_result.go index c483058fb..e547d16f0 100644 --- a/banyand/internal/sidx/query_result.go +++ b/banyand/internal/sidx/query_result.go @@ -114,16 +114,30 @@ func (qr *queryResult) loadBlockData(tmpBlock *block, p *part, bm *blockMetadata // Load tag data for selected tags only for tagName := range tagsToLoad { tagBlockInfo, exists := bm.tagsBlocks[tagName] - if !exists { - continue // Skip missing tags + if exists { + qr.loadTagData(tmpBlock, p, tagName, &tagBlockInfo, int(bm.count), decoder) + continue } - - if !qr.loadTagData(tmpBlock, p, tagName, &tagBlockInfo, int(bm.count), decoder) { - // Continue loading other tags even if one fails + schemaType, hasSchemaType := qr.request.SchemaTagTypes[tagName] + if !hasSchemaType { continue } + for typedTag, typedBlock := range bm.tagsBlocks { + if decodeTypedTag(typedTag) != tagName { + continue + } + if valueType(typedTag) != schemaType { + continue + } + if qr.loadTagData(tmpBlock, p, typedTag, &typedBlock, int(bm.count), decoder) { + td := tmpBlock.tags[typedTag] + td.name = tagName + tmpBlock.tags[tagName] = td + delete(tmpBlock.tags, typedTag) + } + break + } } - return len(tmpBlock.userKeys) > 0 } diff --git a/banyand/internal/sidx/scan_query.go b/banyand/internal/sidx/scan_query.go index 4a063d3a8..c72b3c8f6 100644 --- a/banyand/internal/sidx/scan_query.go +++ b/banyand/internal/sidx/scan_query.go @@ -151,16 +151,17 @@ func (s *sidx) scanPart(ctx context.Context, pw *partWrapper, req ScanQueryReque } // Create a temporary QueryRequest to reuse existing blockCursor infrastructure - tmpReq := QueryRequest{ - TagFilter: req.TagFilter, - MinKey: &minKey, - MaxKey: &maxKey, - TagProjection: req.TagProjection, - MaxBatchSize: maxBatchSize, + qr := QueryRequest{ + TagFilter: req.TagFilter, + SchemaTagTypes: req.SchemaTagTypes, + MinKey: &minKey, + MaxKey: &maxKey, + TagProjection: req.TagProjection, + MaxBatchSize: maxBatchSize, } bc := generateBlockCursor() - bc.init(p, pi.curBlock, tmpReq) + bc.init(p, pi.curBlock, qr) // Load block data tmpBlock := generateBlock() @@ -184,7 +185,7 @@ func (s *sidx) scanPart(ctx context.Context, pw *partWrapper, req ScanQueryReque if s.loadBlockCursor(bc, tmpBlock, blockScanResult{ p: p, bm: *pi.curBlock, - }, tagsToLoad, tmpReq, s.pm, nil) { + }, tagsToLoad, qr, s.pm, nil) { // Copy all rows from this block cursor to current batch for idx := 0; idx < len(bc.userKeys); idx++ { bc.idx = idx diff --git a/banyand/internal/sidx/tag.go b/banyand/internal/sidx/tag.go index e1f986060..6686f9c88 100644 --- a/banyand/internal/sidx/tag.go +++ b/banyand/internal/sidx/tag.go @@ -19,6 +19,7 @@ package sidx import ( "fmt" + "strings" internalencoding "github.com/apache/skywalking-banyandb/banyand/internal/encoding" "github.com/apache/skywalking-banyandb/pkg/bytes" @@ -28,6 +29,66 @@ import ( "github.com/apache/skywalking-banyandb/pkg/pool" ) +const typedTagSeparator = "#" + +var ( + valueTypeToSuffix = map[pbv1.ValueType]string{ + pbv1.ValueTypeStr: "str", + pbv1.ValueTypeInt64: "int", + pbv1.ValueTypeFloat64: "float", + pbv1.ValueTypeBinaryData: "bin", + pbv1.ValueTypeStrArr: "str_arr", + pbv1.ValueTypeInt64Arr: "int_arr", + pbv1.ValueTypeTimestamp: "ts", + pbv1.ValueTypeUnknown: "", + } + suffixToValueType = map[string]pbv1.ValueType{ + "str": pbv1.ValueTypeStr, + "int": pbv1.ValueTypeInt64, + "float": pbv1.ValueTypeFloat64, + "bin": pbv1.ValueTypeBinaryData, + "str_arr": pbv1.ValueTypeStrArr, + "int_arr": pbv1.ValueTypeInt64Arr, + "ts": pbv1.ValueTypeTimestamp, + } +) + +func encodeTypedTag(name string, vt pbv1.ValueType) string { + suffix, ok := valueTypeToSuffix[vt] + if !ok || suffix == "" { + return name + } + return name + typedTagSeparator + suffix +} + +func decodeTypedTag(key string) string { + for suffix := range suffixToValueType { + sepSuffix := typedTagSeparator + suffix + if strings.HasSuffix(key, sepSuffix) { + return key[:len(key)-len(sepSuffix)] + } + } + return key +} + +func hasTypeSuffix(key string) bool { + for suffix := range suffixToValueType { + if strings.HasSuffix(key, typedTagSeparator+suffix) { + return true + } + } + return false +} + +func valueType(name string) pbv1.ValueType { + for suffix, vt := range suffixToValueType { + if strings.HasSuffix(name, typedTagSeparator+suffix) { + return vt + } + } + return pbv1.ValueTypeUnknown +} + // dataBlock represents a reference to data in a file. type dataBlock struct { offset uint64 @@ -278,18 +339,18 @@ func (tm *tagMetadata) marshal(dst []byte) []byte { } // unmarshal deserializes tag metadata from bytes using encoding package. -func (tm *tagMetadata) unmarshal(src []byte) ([]byte, error) { +func (tm *tagMetadata) unmarshal(src []byte) error { var nameBytes []byte var err error src, nameBytes, err = pkgencoding.DecodeBytes(src) if err != nil { - return nil, fmt.Errorf("cannot unmarshal tagMetadata.name: %w", err) + return fmt.Errorf("cannot unmarshal tagMetadata.name: %w", err) } tm.name = string(nameBytes) if len(src) < 1 { - return nil, fmt.Errorf("cannot unmarshal tagMetadata.valueType: src is too short") + return fmt.Errorf("cannot unmarshal tagMetadata.valueType: src is too short") } tm.valueType = pbv1.ValueType(src[0]) src = src[1:] @@ -300,26 +361,26 @@ func (tm *tagMetadata) unmarshal(src []byte) ([]byte, error) { src, tm.filterBlock.size = pkgencoding.BytesToVarUint64(src) if len(src) < 1 { - return nil, fmt.Errorf("cannot unmarshal tagMetadata flags: src is too short") + return fmt.Errorf("cannot unmarshal tagMetadata flags: src is too short") } src, tm.min, err = pkgencoding.DecodeBytes(src) if err != nil { - return nil, fmt.Errorf("cannot unmarshal tagMetadata.min: %w", err) + return fmt.Errorf("cannot unmarshal tagMetadata.min: %w", err) } if len(tm.min) == 0 { tm.min = nil } - src, tm.max, err = pkgencoding.DecodeBytes(src) + _, tm.max, err = pkgencoding.DecodeBytes(src) if err != nil { - return nil, fmt.Errorf("cannot unmarshal tagMetadata.max: %w", err) + return fmt.Errorf("cannot unmarshal tagMetadata.max: %w", err) } if len(tm.max) == 0 { tm.max = nil } - return src, nil + return nil } // marshalAppend serializes tagMetadata to bytes and appends to dst (panic version for mustWriteTag). @@ -330,7 +391,7 @@ func (tm *tagMetadata) marshalAppend(dst []byte) []byte { // unmarshalTagMetadata deserializes tag metadata from bytes. func unmarshalTagMetadata(data []byte) (*tagMetadata, error) { tm := generateTagMetadata() - _, err := tm.unmarshal(data) + err := tm.unmarshal(data) if err != nil { releaseTagMetadata(tm) return nil, err diff --git a/banyand/trace/block.go b/banyand/trace/block.go index b4e05733b..0db54fdd1 100644 --- a/banyand/trace/block.go +++ b/banyand/trace/block.go @@ -465,7 +465,8 @@ func (bc *blockCursor) copyAllTo(r *model.TraceResult) { } for i, t := range bc.tags { values := make([]*modelv1.TagValue, len(bc.spans)) - schemaType, hasSchemaType := bc.schemaTagTypes[t.name] + name := decodeTypedTag(t.name) + schemaType, hasSchemaType := bc.schemaTagTypes[name] for k := range bc.spans { if len(t.values) > k { if hasSchemaType && t.valueType == schemaType { @@ -485,18 +486,32 @@ func (bc *blockCursor) loadData(tmpBlock *block) bool { tmpBlock.reset() bc.bm.tagProjection = bc.tagProjection var t map[string]*dataBlock - for _, name := range bc.tagProjection.Names { - for tagName, block := range bc.bm.tags { - if tagName == name { - if t == nil { - t = make(map[string]*dataBlock, len(bc.tagProjection.Names)) + projectionNames := make([]string, len(bc.tagProjection.Names)) + copy(projectionNames, bc.tagProjection.Names) + for i, name := range bc.tagProjection.Names { + for typedTag, block := range bc.bm.tags { + if decodeTypedTag(typedTag) != name { + continue + } + if hasTypeSuffix(typedTag) { + schemaType, ok := bc.schemaTagTypes[name] + if !ok { + continue + } + if bmType, ok := bc.bm.tagType[typedTag]; !ok || bmType != schemaType { + continue } - t[name] = block + projectionNames[i] = typedTag + } + if t == nil { + t = make(map[string]*dataBlock, len(bc.tagProjection.Names)) } + t[typedTag] = block } } bc.bm.tags = t + bc.bm.tagProjection = &model.TagProjection{Names: projectionNames} tmpBlock.mustReadFrom(&bc.tagValuesDecoder, bc.p, bc.bm) if len(tmpBlock.spans) == 0 { return false @@ -591,6 +606,8 @@ func fastTagAppend(bi, b *blockPointer, offset int) error { return fmt.Errorf("unexpected tag name for tag %q: got %q; want %q", bi.tags[i].name, b.tags[i].name, bi.tags[i].name) } + } + for i := range bi.tags { assertIdxAndOffset(b.tags[i].name, len(b.tags[i].values), b.idx, offset) bi.tags[i].values = append(bi.tags[i].values, b.tags[i].values[b.idx:offset]...) } diff --git a/banyand/trace/merger.go b/banyand/trace/merger.go index d4286fd9d..0ee1c05d1 100644 --- a/banyand/trace/merger.go +++ b/banyand/trace/merger.go @@ -30,6 +30,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/cgroups" "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/fs" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/watcher" ) @@ -320,6 +321,7 @@ func (tst *tsTable) mergeParts(fileSystem fs.FileSystem, closeCh <-chan struct{} br.init(pii) bw := generateBlockWriter() bw.mustInitForFilePart(fileSystem, dstPath, shouldCache, int(traceSize)) + conflictTags := collectConflictTags(parts) var minTimestamp, maxTimestamp int64 for i, pw := range parts { @@ -337,7 +339,7 @@ func (tst *tsTable) mergeParts(fileSystem fs.FileSystem, closeCh <-chan struct{} } } - pm, tf, tt, err := mergeBlocks(closeCh, bw, br) + pm, tf, tt, err := mergeBlocks(closeCh, bw, br, conflictTags) releaseBlockWriter(bw) releaseBlockReader(br) for i := range pii { @@ -364,7 +366,30 @@ var errClosed = fmt.Errorf("the merger is closed") // forceSlowMerge is used for testing to disable the fast raw merge path. var forceSlowMerge = false -func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter, br *blockReader) (*partMetadata, *traceIDFilter, *tagType, error) { +func collectConflictTags(parts []*partWrapper) map[string]struct{} { + tagTypes := make(map[string]map[pbv1.ValueType]struct{}) + for _, pw := range parts { + for tag, vt := range pw.p.tagType { + t := decodeTypedTag(tag) + if tagTypes[t] == nil { + tagTypes[t] = make(map[pbv1.ValueType]struct{}) + } + tagTypes[t][vt] = struct{}{} + } + } + var result map[string]struct{} + for tag, types := range tagTypes { + if len(types) > 1 { + if result == nil { + result = make(map[string]struct{}) + } + result[tag] = struct{}{} + } + } + return result +} + +func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter, br *blockReader, conflictTags map[string]struct{}) (*partMetadata, *traceIDFilter, *tagType, error) { pendingBlockIsEmpty := true pendingBlock := generateBlockPointer() defer releaseBlockPointer(pendingBlock) @@ -383,6 +408,14 @@ func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter, br *blockReader) (*pa decoder = nil } } + loadAndRename := func() { + br.loadBlockData(getDecoder()) + renameConflictTags(&br.block.block, conflictTags) + } + readAndRename := func(bm *blockMetadata) { + br.mustReadRaw(&rawBlk, bm) + renameRawConflictTags(&rawBlk, conflictTags) + } for br.nextBlockMetadata() { select { case <-closeCh: @@ -395,13 +428,13 @@ func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter, br *blockReader) (*pa nextB := br.peek() if !forceSlowMerge && pendingBlockIsEmpty && (nextB == nil || nextB.bm.traceID != b.bm.traceID) { // fast path: only a single block for the trace id and no pending data - br.mustReadRaw(&rawBlk, &b.bm) + readAndRename(&b.bm) bw.mustWriteRawBlock(&rawBlk) continue } if pendingBlockIsEmpty { - br.loadBlockData(getDecoder()) + loadAndRename() pendingBlock.copyFrom(b) pendingBlockIsEmpty = false continue @@ -416,12 +449,12 @@ func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter, br *blockReader) (*pa nextB = br.peek() if !forceSlowMerge && (nextB == nil || nextB.bm.traceID != b.bm.traceID) { // fast path: only a single block for this new trace id - br.mustReadRaw(&rawBlk, &b.bm) + readAndRename(&b.bm) bw.mustWriteRawBlock(&rawBlk) continue } // Slow path: start accumulating the new block - br.loadBlockData(getDecoder()) + loadAndRename() pendingBlock.copyFrom(b) pendingBlockIsEmpty = false continue @@ -433,7 +466,7 @@ func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter, br *blockReader) (*pa } tmpBlock.reset() tmpBlock.bm.traceID = b.bm.traceID - br.loadBlockData(getDecoder()) + loadAndRename() mergeTwoBlocks(tmpBlock, pendingBlock, b) if tmpBlock.block.spanSize() <= maxUncompressedSpanSize { if len(tmpBlock.spans) == 0 { @@ -466,3 +499,40 @@ func mergeTwoBlocks(target, left, right *blockPointer) { target.appendAll(left) target.appendAll(right) } + +func renameConflictTags(b *block, conflictTags map[string]struct{}) { + if len(conflictTags) == 0 { + return + } + for i := range b.tags { + if _, ok := conflictTags[b.tags[i].name]; ok { + b.tags[i].name = encodeTypedTag(b.tags[i].name, b.tags[i].valueType) + } + } +} + +func renameRawConflictTags(r *rawBlock, conflictTags map[string]struct{}) { + if len(conflictTags) == 0 { + return + } + bm := r.bm + for tag := range conflictTags { + if _, ok := bm.tags[tag]; !ok { + continue + } + valueType := bm.tagType[tag] + typedTag := encodeTypedTag(tag, valueType) + bm.tags[typedTag] = bm.tags[tag] + delete(bm.tags, tag) + bm.tagType[typedTag] = valueType + delete(bm.tagType, tag) + if rawData, ok := r.tags[tag]; ok { + r.tags[typedTag] = rawData + delete(r.tags, tag) + } + if rawMeta, ok := r.tagMetadata[tag]; ok { + r.tagMetadata[typedTag] = rawMeta + delete(r.tagMetadata, tag) + } + } +} diff --git a/banyand/trace/merger_bench_test.go b/banyand/trace/merger_bench_test.go index fb2aae7f1..601644c21 100644 --- a/banyand/trace/merger_bench_test.go +++ b/banyand/trace/merger_bench_test.go @@ -119,7 +119,7 @@ func Benchmark_mergeBlocks_FastRawMerge(b *testing.B) { closeCh := make(chan struct{}) - _, _, _, err := mergeBlocks(closeCh, bw, br) + _, _, _, err := mergeBlocks(closeCh, bw, br, nil) if err != nil { b.Fatal(err) } @@ -226,7 +226,7 @@ func Benchmark_mergeBlocks_OriginalMerge(b *testing.B) { closeCh := make(chan struct{}) - _, _, _, err := mergeBlocks(closeCh, bw, br) + _, _, _, err := mergeBlocks(closeCh, bw, br, nil) if err != nil { b.Fatal(err) } @@ -331,7 +331,7 @@ func Benchmark_mergeBlocks_Comparison(b *testing.B) { closeCh := make(chan struct{}) - _, _, _, err := mergeBlocks(closeCh, bw, br) + _, _, _, err := mergeBlocks(closeCh, bw, br, nil) if err != nil { b.Fatal(err) } @@ -377,7 +377,7 @@ func Benchmark_mergeBlocks_Comparison(b *testing.B) { closeCh := make(chan struct{}) - _, _, _, err := mergeBlocks(closeCh, bw, br) + _, _, _, err := mergeBlocks(closeCh, bw, br, nil) if err != nil { b.Fatal(err) } diff --git a/banyand/trace/merger_test.go b/banyand/trace/merger_test.go index f162b400e..e2c5c4e9f 100644 --- a/banyand/trace/merger_test.go +++ b/banyand/trace/merger_test.go @@ -269,7 +269,7 @@ func Test_mergeBlocks_fastPath(t *testing.T) { closeCh := make(chan struct{}) defer close(closeCh) - pm, tf, tagTypes, err := mergeBlocks(closeCh, bw, br) + pm, tf, tagTypes, err := mergeBlocks(closeCh, bw, br, nil) require.NoError(t, err) require.NotNil(t, pm) require.NotNil(t, tf) @@ -1456,3 +1456,155 @@ func Test_flush_cleansUpOnSidxFlushError(t *testing.T) { sidxPartPath := filepath.Join(tmpPath, sidxDirName, "idx1", fmt.Sprintf("%016x", partID)) require.False(t, fileSystem.IsExist(sidxPartPath), "sidx part directory should be removed on sidx flush failure") } + +func Test_mergePartsWithConflictingTagTypes(t *testing.T) { + tests := []struct { + name string + parts []*traces + wantTagTypes map[string]pbv1.ValueType + wantDeleted []string + }{ + { + name: "multiple parts with int and str for same tag", + parts: []*traces{ + { + traceIDs: []string{"trace1"}, + timestamps: []int64{1}, + tags: [][]*tagValue{ + { + {tag: "state", valueType: pbv1.ValueTypeInt64, value: convert.Int64ToBytes(100)}, + {tag: "name", valueType: pbv1.ValueTypeStr, value: []byte("svc1")}, + }, + }, + spans: [][]byte{[]byte("span1")}, + spanIDs: []string{"span1"}, + }, + { + traceIDs: []string{"trace2"}, + timestamps: []int64{2}, + tags: [][]*tagValue{ + { + {tag: "state", valueType: pbv1.ValueTypeStr, value: []byte("active")}, + {tag: "name", valueType: pbv1.ValueTypeStr, value: []byte("svc2")}, + }, + }, + spans: [][]byte{[]byte("span2")}, + spanIDs: []string{"span2"}, + }, + }, + wantTagTypes: map[string]pbv1.ValueType{ + "state#int": pbv1.ValueTypeInt64, + "state#str": pbv1.ValueTypeStr, + "name": pbv1.ValueTypeStr, + }, + wantDeleted: []string{"state"}, + }, + { + name: "merge typed-suffix part with plain part", + parts: []*traces{ + { + traceIDs: []string{"trace1"}, + timestamps: []int64{1}, + tags: [][]*tagValue{ + {{tag: "state#int", valueType: pbv1.ValueTypeInt64, value: convert.Int64ToBytes(100)}}, + }, + spans: [][]byte{[]byte("span1")}, + spanIDs: []string{"span1"}, + }, + { + traceIDs: []string{"trace2"}, + timestamps: []int64{2}, + tags: [][]*tagValue{ + {{tag: "state", valueType: pbv1.ValueTypeStr, value: []byte("pending")}}, + }, + spans: [][]byte{[]byte("span2")}, + spanIDs: []string{"span2"}, + }, + }, + wantTagTypes: map[string]pbv1.ValueType{ + "state#int": pbv1.ValueTypeInt64, + "state#str": pbv1.ValueTypeStr, + }, + wantDeleted: []string{"state"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tmpPath, defFn := test.Space(require.New(t)) + defer defFn() + fileSystem := fs.NewLocalFileSystem() + + partIDs := make([]uint64, len(tt.parts)) + for idx := range partIDs { + partIDs[idx] = uint64(idx) + } + dstID := uint64(999) + dstPath := mergePartsWithConflictTags(t, fileSystem, tmpPath, tt.parts, partIDs, dstID) + + mergedPart := mustOpenFilePart(dstID, tmpPath, fileSystem) + for tag, wantType := range tt.wantTagTypes { + gotType, exists := mergedPart.tagType[tag] + require.True(t, exists, "expected tag type entry for %q", tag) + require.Equal(t, wantType, gotType, "tag type mismatch for %q", tag) + } + for _, tag := range tt.wantDeleted { + _, ok := mergedPart.tagType[tag] + require.False(t, ok, "tag %q should not exist in tagType after merge", tag) + dataFile := filepath.Join(dstPath, tag+tagsFilenameExt) + require.False(t, fileSystem.IsExist(dataFile), "tag data file %q should not exist", dataFile) + metaFile := filepath.Join(dstPath, tag+tagsMetadataFilenameExt) + require.False(t, fileSystem.IsExist(metaFile), "tag metadata file %q should not exist", metaFile) + } + }) + } +} + +func mergePartsWithConflictTags(t *testing.T, fileSystem fs.FileSystem, tmpPath string, tsList []*traces, partIDs []uint64, dstID uint64) string { + t.Helper() + var parts []*partWrapper + for i, traces := range tsList { + mp := generateMemPart() + mp.mustInitFromTraces(traces) + mp.mustFlush(fileSystem, partPath(tmpPath, partIDs[i])) + p := mustOpenFilePart(partIDs[i], tmpPath, fileSystem) + parts = append(parts, &partWrapper{p: p}) + releaseMemPart(mp) + } + + conflictTags := collectConflictTags(parts) + + var pmi []*partMergeIter + var traceSize uint64 + for i := range tsList { + p := mustOpenFilePart(partIDs[i], tmpPath, fileSystem) + iter := generatePartMergeIter() + iter.mustInitFromPart(p) + pmi = append(pmi, iter) + traceSize += uint64(len(tsList[i].traceIDs)) + } + + br := generateBlockReader() + br.init(pmi) + bw := generateBlockWriter() + dstPath := partPath(tmpPath, dstID) + bw.mustInitForFilePart(fileSystem, dstPath, false, int(traceSize)) + + closeCh := make(chan struct{}) + pm, tf, tagTypes, mergeErr := mergeBlocks(closeCh, bw, br, conflictTags) + close(closeCh) + require.NoError(t, mergeErr) + require.NotNil(t, pm) + + releaseBlockWriter(bw) + releaseBlockReader(br) + for _, iter := range pmi { + releasePartMergeIter(iter) + } + + pm.mustWriteMetadata(fileSystem, dstPath) + tf.mustWriteTraceIDFilter(fileSystem, dstPath) + tagTypes.mustWriteTagType(fileSystem, dstPath) + fileSystem.SyncPath(dstPath) + return dstPath +} diff --git a/banyand/trace/query.go b/banyand/trace/query.go index 801b70872..7845d0eb7 100644 --- a/banyand/trace/query.go +++ b/banyand/trace/query.go @@ -252,13 +252,14 @@ func (t *trace) prepareSIDXStreaming( } req := sidx.QueryRequest{ - Filter: tqo.SkippingFilter, - TagFilter: tqo.TagFilter, - Order: tqo.Order, - MaxBatchSize: tqo.MaxTraceSize, - MinKey: &tqo.MinVal, - MaxKey: &tqo.MaxVal, - SeriesIDs: seriesIDs, + Filter: tqo.SkippingFilter, + TagFilter: tqo.TagFilter, + Order: tqo.Order, + MaxBatchSize: tqo.MaxTraceSize, + MinKey: &tqo.MinVal, + MaxKey: &tqo.MaxVal, + SeriesIDs: seriesIDs, + SchemaTagTypes: qo.schemaTagTypes, } if tqo.TagProjection != nil { req.TagProjection = []model.TagProjection{*tqo.TagProjection} diff --git a/banyand/trace/svc_standalone.go b/banyand/trace/svc_standalone.go index 77dd2f873..13f4e94e4 100644 --- a/banyand/trace/svc_standalone.go +++ b/banyand/trace/svc_standalone.go @@ -95,6 +95,7 @@ func (s *standalone) FlagSet() *run.FlagSet { fs.IntVar(&s.maxFileSnapshotNum, "trace-max-file-snapshot-num", 10, "the maximum number of file snapshots") fs.DurationVar(&s.minFileSnapshotAge, "trace-min-file-snapshot-age", time.Hour, "minimum age for file snapshots to be eligible for deletion") s.option.mergePolicy = newDefaultMergePolicy() + fs.IntVar(&s.option.mergePolicy.maxParts, "trace-max-merge-parts", s.option.mergePolicy.maxParts, "the maximum number of parts to merge at once") fs.VarP(&s.option.mergePolicy.maxFanOutSize, "trace-max-fan-out-size", "", "the upper bound of a single file size after merge of trace") // Additional flags can be added here return fs diff --git a/banyand/trace/tag.go b/banyand/trace/tag.go index 647358da7..31c612f9b 100644 --- a/banyand/trace/tag.go +++ b/banyand/trace/tag.go @@ -18,6 +18,8 @@ package trace import ( + "strings" + internalencoding "github.com/apache/skywalking-banyandb/banyand/internal/encoding" "github.com/apache/skywalking-banyandb/pkg/bytes" pkgencoding "github.com/apache/skywalking-banyandb/pkg/encoding" @@ -26,6 +28,57 @@ import ( pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" ) +const typedTagSeparator = "#" + +var ( + valueTypeToSuffix = map[pbv1.ValueType]string{ + pbv1.ValueTypeStr: "str", + pbv1.ValueTypeInt64: "int", + pbv1.ValueTypeFloat64: "float", + pbv1.ValueTypeBinaryData: "bin", + pbv1.ValueTypeStrArr: "str_arr", + pbv1.ValueTypeInt64Arr: "int_arr", + pbv1.ValueTypeTimestamp: "ts", + pbv1.ValueTypeUnknown: "", + } + suffixToValueType = map[string]pbv1.ValueType{ + "str": pbv1.ValueTypeStr, + "int": pbv1.ValueTypeInt64, + "float": pbv1.ValueTypeFloat64, + "bin": pbv1.ValueTypeBinaryData, + "str_arr": pbv1.ValueTypeStrArr, + "int_arr": pbv1.ValueTypeInt64Arr, + "ts": pbv1.ValueTypeTimestamp, + } +) + +func encodeTypedTag(name string, vt pbv1.ValueType) string { + suffix, ok := valueTypeToSuffix[vt] + if !ok || suffix == "" { + return name + } + return name + typedTagSeparator + suffix +} + +func decodeTypedTag(key string) string { + for suffix := range suffixToValueType { + sepSuffix := typedTagSeparator + suffix + if strings.HasSuffix(key, sepSuffix) { + return key[:len(key)-len(sepSuffix)] + } + } + return key +} + +func hasTypeSuffix(key string) bool { + for suffix := range suffixToValueType { + if strings.HasSuffix(key, typedTagSeparator+suffix) { + return true + } + } + return false +} + type tag struct { name string values [][]byte diff --git a/banyand/trace/tag_test.go b/banyand/trace/tag_test.go index 2ac463d74..fce0a0233 100644 --- a/banyand/trace/tag_test.go +++ b/banyand/trace/tag_test.go @@ -34,6 +34,25 @@ func timeToBytes(t time.Time) []byte { return pkgencoding.Int64ToBytes(nil, t.UnixNano()) } +func TestEncodeTypedTag(t *testing.T) { + assert.Equal(t, "state#int", encodeTypedTag("state", pbv1.ValueTypeInt64)) + assert.Equal(t, "state#str", encodeTypedTag("state", pbv1.ValueTypeStr)) + assert.Equal(t, "http.status#int", encodeTypedTag("http.status", pbv1.ValueTypeInt64)) + assert.Equal(t, "data#bin", encodeTypedTag("data", pbv1.ValueTypeBinaryData)) + assert.Equal(t, "unknown", encodeTypedTag("unknown", pbv1.ValueTypeUnknown)) +} + +func TestDecodeTypedTag(t *testing.T) { + assert.Equal(t, "state", decodeTypedTag("state#int")) + assert.Equal(t, "state", decodeTypedTag("state#str")) + assert.Equal(t, "http.status", decodeTypedTag("http.status#int")) + assert.Equal(t, "http.status", decodeTypedTag("http.status")) + assert.Equal(t, "simple", decodeTypedTag("simple")) + assert.Equal(t, "my.str", decodeTypedTag("my.str")) + assert.Equal(t, "arr.int", decodeTypedTag("arr.int")) + assert.Equal(t, "data.bin", decodeTypedTag("data.bin")) +} + func TestTagEncodingDecoding(t *testing.T) { t.Run("test int64 tag encoding/decoding", func(t *testing.T) { tag := &tag{ diff --git a/banyand/trace/trace_suite_test.go b/banyand/trace/trace_suite_test.go index daefee2bd..9e69e4790 100644 --- a/banyand/trace/trace_suite_test.go +++ b/banyand/trace/trace_suite_test.go @@ -90,7 +90,7 @@ func setUp() (*services, func()) { flags = append(flags, "--metadata-root-path="+metaPath) rootPath, deferFunc, err := test.NewSpace() gomega.Expect(err).NotTo(gomega.HaveOccurred()) - flags = append(flags, "--trace-root-path="+rootPath) + flags = append(flags, "--trace-root-path="+rootPath, "--trace-max-merge-parts=2") listenClientURL, listenPeerURL, err := test.NewEtcdListenUrls() gomega.Expect(err).NotTo(gomega.HaveOccurred()) flags = append(flags, "--etcd-listen-client-url="+listenClientURL, "--etcd-listen-peer-url="+listenPeerURL,
