This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch v0.10.x
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit c9961ab39ab8bb9680117772ba05131506b63809
Author: Huang Youliang <[email protected]>
AuthorDate: Tue Apr 21 10:06:35 2026 +0800

    Support multiple types for the same tag in a single trace or sidx part 
(#1066)
    
    * Support multiple types for the same tag in a single part
    
    Co-authored-by: Gao Hongtao <[email protected]>
---
 api/validate/validate.go              |  26 ++++++
 banyand/internal/sidx/block.go        |   2 +
 banyand/internal/sidx/interfaces.go   |  38 +++++----
 banyand/internal/sidx/merge.go        |  84 +++++++++++++++++--
 banyand/internal/sidx/merge_test.go   | 121 ++++++++++++++++++++++++++
 banyand/internal/sidx/query_result.go |  26 ++++--
 banyand/internal/sidx/scan_query.go   |  17 ++--
 banyand/internal/sidx/tag.go          |  79 +++++++++++++++--
 banyand/trace/block.go                |  31 +++++--
 banyand/trace/merger.go               |  84 +++++++++++++++++--
 banyand/trace/merger_bench_test.go    |   8 +-
 banyand/trace/merger_test.go          | 154 +++++++++++++++++++++++++++++++++-
 banyand/trace/query.go                |  15 ++--
 banyand/trace/svc_standalone.go       |   1 +
 banyand/trace/tag.go                  |  53 ++++++++++++
 banyand/trace/tag_test.go             |  19 +++++
 banyand/trace/trace_suite_test.go     |   2 +-
 17 files changed, 687 insertions(+), 73 deletions(-)

diff --git a/api/validate/validate.go b/api/validate/validate.go
index 61e7b5d75..946c99061 100644
--- a/api/validate/validate.go
+++ b/api/validate/validate.go
@@ -20,11 +20,22 @@ package validate
 
 import (
        "errors"
+       "fmt"
+       "strings"
 
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
 )
 
+const reservedTagSeparator = "#"
+
+func validateTagName(name string) error {
+       if strings.Contains(name, reservedTagSeparator) {
+               return fmt.Errorf("tag name %q must not contain reserved 
character %q", name, reservedTagSeparator)
+       }
+       return nil
+}
+
 // Group validates the provided Group object.
 func Group(group *commonv1.Group) error {
        if group.Metadata == nil {
@@ -188,16 +199,28 @@ func Trace(trace *databasev1.Trace) error {
        if trace.TraceIdTagName == "" {
                return errors.New("trace_id_tag_name is empty")
        }
+       if err := validateTagName(trace.TraceIdTagName); err != nil {
+               return err
+       }
        if trace.SpanIdTagName == "" {
                return errors.New("span_id_tag_name is empty")
        }
+       if err := validateTagName(trace.SpanIdTagName); err != nil {
+               return err
+       }
        if trace.TimestampTagName == "" {
                return errors.New("timestamp_tag_name is empty")
        }
+       if err := validateTagName(trace.TimestampTagName); err != nil {
+               return err
+       }
        for i := range trace.Tags {
                if trace.Tags[i].Name == "" {
                        return errors.New("trace tag name is empty")
                }
+               if err := validateTagName(trace.Tags[i].Name); err != nil {
+                       return err
+               }
                if trace.Tags[i].Type == 
databasev1.TagType_TAG_TYPE_UNSPECIFIED {
                        return errors.New("trace tag type is unspecified")
                }
@@ -236,6 +259,9 @@ func tagFamily(tagFamilies []*databasev1.TagFamilySpec) 
error {
                        if tagFamilies[i].Tags[j].Name == "" {
                                return errors.New("tag name is empty")
                        }
+                       if err := validateTagName(tagFamilies[i].Tags[j].Name); 
err != nil {
+                               return err
+                       }
                        if tagFamilies[i].Tags[j].Type == 
databasev1.TagType_TAG_TYPE_UNSPECIFIED {
                                return errors.New("tag type is unspecified")
                        }
diff --git a/banyand/internal/sidx/block.go b/banyand/internal/sidx/block.go
index c240dbbfc..3522b474b 100644
--- a/banyand/internal/sidx/block.go
+++ b/banyand/internal/sidx/block.go
@@ -482,6 +482,8 @@ func fastTagAppend(bi, b *blockPointer, offset int) error {
                if _, exists := b.tags[t.name]; !exists {
                        return fmt.Errorf("unexpected tag name for tag %q", 
t.name)
                }
+       }
+       for _, t := range bi.tags {
                assertIdxAndOffset(t.name, len(b.tags[t.name].values), b.idx, 
offset)
                bi.tags[t.name].values = append(bi.tags[t.name].values, 
b.tags[t.name].values[b.idx:offset]...)
        }
diff --git a/banyand/internal/sidx/interfaces.go 
b/banyand/internal/sidx/interfaces.go
index 058c8626f..215328507 100644
--- a/banyand/internal/sidx/interfaces.go
+++ b/banyand/internal/sidx/interfaces.go
@@ -105,16 +105,17 @@ type WriteRequest struct {
 
 // QueryRequest specifies parameters for a query operation, following 
StreamQueryOptions pattern.
 type QueryRequest struct {
-       Filter        index.Filter
-       TagFilter     model.TagFilterMatcher
-       Order         *index.OrderBy
-       MinKey        *int64
-       MaxKey        *int64
-       MinTimestamp  *int64
-       MaxTimestamp  *int64
-       SeriesIDs     []common.SeriesID
-       TagProjection []model.TagProjection
-       MaxBatchSize  int
+       Filter         index.Filter
+       TagFilter      model.TagFilterMatcher
+       Order          *index.OrderBy
+       MinKey         *int64
+       MaxKey         *int64
+       SchemaTagTypes map[string]pbv1.ValueType
+       MinTimestamp   *int64
+       MaxTimestamp   *int64
+       SeriesIDs      []common.SeriesID
+       TagProjection  []model.TagProjection
+       MaxBatchSize   int
 }
 
 // ScanProgressFunc is a callback for reporting scan progress.
@@ -127,14 +128,15 @@ type ScanProgressFunc func(currentPart, totalParts int, 
rowsFound int)
 //
 //nolint:govet // struct layout optimized for readability; 64 bytes is 
acceptable
 type ScanQueryRequest struct {
-       TagFilter     model.TagFilterMatcher
-       OnProgress    ScanProgressFunc
-       MinKey        *int64
-       MaxKey        *int64
-       MinTimestamp  *int64
-       MaxTimestamp  *int64
-       TagProjection []model.TagProjection
-       MaxBatchSize  int
+       TagFilter      model.TagFilterMatcher
+       SchemaTagTypes map[string]pbv1.ValueType
+       OnProgress     ScanProgressFunc
+       MinKey         *int64
+       MaxKey         *int64
+       MinTimestamp   *int64
+       MaxTimestamp   *int64
+       TagProjection  []model.TagProjection
+       MaxBatchSize   int
 }
 
 // QueryResponse contains a batch of query results and execution metadata.
diff --git a/banyand/internal/sidx/merge.go b/banyand/internal/sidx/merge.go
index ac901a2b5..83c18f3a4 100644
--- a/banyand/internal/sidx/merge.go
+++ b/banyand/internal/sidx/merge.go
@@ -19,9 +19,11 @@ package sidx
 
 import (
        "fmt"
+       "io"
 
        "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/fs"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
 )
 
 var (
@@ -80,6 +82,7 @@ func (s *sidx) mergeParts(fileSystem fs.FileSystem, closeCh 
<-chan struct{}, par
                return nil, errNoPartToMerge
        }
        dstPath := partPath(root, partID)
+       conflictTags := collectConflictTags(parts)
        var totalSize int64
        pii := make([]*partMergeIter, 0, len(parts))
        for i := range parts {
@@ -94,7 +97,7 @@ func (s *sidx) mergeParts(fileSystem fs.FileSystem, closeCh 
<-chan struct{}, par
        bw := generateBlockWriter()
        bw.mustInitForFilePart(fileSystem, dstPath, shouldCache)
 
-       pm, err := mergeBlocks(closeCh, bw, br)
+       pm, err := mergeBlocks(closeCh, bw, br, conflictTags)
        releaseBlockWriter(bw)
        releaseBlockReader(br)
        for i := range pii {
@@ -133,7 +136,7 @@ func (s *sidx) mergeParts(fileSystem fs.FileSystem, closeCh 
<-chan struct{}, par
        return newPartWrapper(nil, p), nil
 }
 
-func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter, br *blockReader) 
(*partMetadata, error) {
+func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter, br *blockReader, 
conflictTags map[string]struct{}) (*partMetadata, error) {
        pendingBlockIsEmpty := true
        pendingBlock := generateBlockPointer()
        defer releaseBlockPointer(pendingBlock)
@@ -151,6 +154,10 @@ func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter, 
br *blockReader) (*pa
                        decoder = nil
                }
        }
+       loadAndRename := func() {
+               br.loadBlockData(getDecoder())
+               renameConflictTags(&br.block.block, conflictTags)
+       }
        for br.nextBlockMetadata() {
                select {
                case <-closeCh:
@@ -160,7 +167,7 @@ func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter, 
br *blockReader) (*pa
                b := br.block
 
                if pendingBlockIsEmpty {
-                       br.loadBlockData(getDecoder())
+                       loadAndRename()
                        pendingBlock.copyFrom(b)
                        pendingBlockIsEmpty = false
                        continue
@@ -171,7 +178,7 @@ func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter, 
br *blockReader) (*pa
                        pendingBlock.block.uncompressedSizeBytes() >= 
maxUncompressedBlockSize {
                        bw.mustWriteBlock(pendingBlock.bm.seriesID, 
&pendingBlock.block)
                        releaseDecoder()
-                       br.loadBlockData(getDecoder())
+                       loadAndRename()
                        pendingBlock.copyFrom(b)
                        continue
                }
@@ -182,7 +189,7 @@ func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter, 
br *blockReader) (*pa
                }
                tmpBlock.reset()
                tmpBlock.bm.seriesID = b.bm.seriesID
-               br.loadBlockData(getDecoder())
+               loadAndRename()
                mergeTwoBlocks(tmpBlock, pendingBlock, b)
                if len(tmpBlock.userKeys) <= maxBlockLength && 
tmpBlock.block.uncompressedSizeBytes() <= maxUncompressedBlockSize {
                        if len(tmpBlock.userKeys) == 0 {
@@ -209,6 +216,73 @@ func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter, 
br *blockReader) (*pa
        return &result, nil
 }
 
+func collectConflictTags(parts []*partWrapper) map[string]struct{} {
+       tagTypes := make(map[string]map[pbv1.ValueType]struct{})
+       for _, pw := range parts {
+               p := pw.p
+               for tt := range p.tagMetadata {
+                       t := tt
+                       var vt pbv1.ValueType
+                       if hasTypeSuffix(tt) {
+                               t = decodeTypedTag(tt)
+                               vt = valueType(tt)
+                       } else {
+                               var readErr error
+                               vt, readErr = 
readFirstTagValueType(p.tagMetadata[tt])
+                               if readErr != nil {
+                                       continue
+                               }
+                       }
+                       if tagTypes[t] == nil {
+                               tagTypes[t] = make(map[pbv1.ValueType]struct{})
+                       }
+                       tagTypes[t][vt] = struct{}{}
+               }
+       }
+       var result map[string]struct{}
+       for tag, types := range tagTypes {
+               if len(types) > 1 {
+                       if result == nil {
+                               result = make(map[string]struct{})
+                       }
+                       result[tag] = struct{}{}
+               }
+       }
+       return result
+}
+
+func readFirstTagValueType(tmReader fs.Reader) (pbv1.ValueType, error) {
+       sr := tmReader.SequentialRead()
+       defer fs.MustClose(sr)
+       buf := make([]byte, 512)
+       n, readErr := io.ReadFull(sr, buf)
+       if readErr != nil && n == 0 {
+               return pbv1.ValueTypeUnknown, fmt.Errorf("cannot read tag 
metadata: %w", readErr)
+       }
+       tm := generateTagMetadata()
+       defer releaseTagMetadata(tm)
+       if unmarshalErr := tm.unmarshal(buf[:n]); unmarshalErr != nil {
+               return pbv1.ValueTypeUnknown, fmt.Errorf("cannot unmarshal tag 
metadata: %w", unmarshalErr)
+       }
+       return tm.valueType, nil
+}
+
+func renameConflictTags(b *block, conflictTags map[string]struct{}) {
+       if len(conflictTags) == 0 {
+               return
+       }
+       for t := range conflictTags {
+               td, exists := b.tags[t]
+               if !exists {
+                       continue
+               }
+               typedName := encodeTypedTag(t, td.valueType)
+               td.name = typedName
+               b.tags[typedName] = td
+               delete(b.tags, t)
+       }
+}
+
 func mergeTwoBlocks(target, left, right *blockPointer) {
        appendIfEmpty := func(ib1, ib2 *blockPointer) bool {
                if ib1.idx >= len(ib1.userKeys) {
diff --git a/banyand/internal/sidx/merge_test.go 
b/banyand/internal/sidx/merge_test.go
index b62f081c0..64c126aa9 100644
--- a/banyand/internal/sidx/merge_test.go
+++ b/banyand/internal/sidx/merge_test.go
@@ -29,6 +29,7 @@ import (
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/banyand/protector"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/test"
@@ -482,6 +483,126 @@ func Test_mergeParts(t *testing.T) {
        }
 }
 
+func Test_mergePartsWithConflictTags(t *testing.T) {
+       esConflictStr := func() *elements {
+               es := generateElements()
+               es.mustAppend(1, 100, make([]byte, 10), []Tag{
+                       {Name: "status", Value: []byte("ok"), ValueType: 
pbv1.ValueTypeStr},
+               })
+               es.mustAppend(2, 200, make([]byte, 10), []Tag{
+                       {Name: "status", Value: []byte("error"), ValueType: 
pbv1.ValueTypeStr},
+               })
+               es.mustAppend(3, 300, make([]byte, 10), []Tag{
+                       {Name: "status", Value: []byte("warn"), ValueType: 
pbv1.ValueTypeStr},
+               })
+               return es
+       }()
+
+       esConflictInt := func() *elements {
+               es := generateElements()
+               es.mustAppend(1, 150, make([]byte, 10), []Tag{
+                       {Name: "status", Value: convert.Int64ToBytes(200), 
ValueType: pbv1.ValueTypeInt64},
+               })
+               es.mustAppend(2, 250, make([]byte, 10), []Tag{
+                       {Name: "status", Value: convert.Int64ToBytes(500), 
ValueType: pbv1.ValueTypeInt64},
+               })
+               es.mustAppend(3, 350, make([]byte, 10), []Tag{
+                       {Name: "status", Value: convert.Int64ToBytes(404), 
ValueType: pbv1.ValueTypeInt64},
+               })
+               return es
+       }()
+
+       verify := func(t *testing.T, pp []*partWrapper, fileSystem 
fs.FileSystem, root string, partID uint64) {
+               closeCh := make(chan struct{})
+               defer close(closeCh)
+               s := &sidx{pm: protector.Nop{}}
+               p, mergeErr := s.mergeParts(fileSystem, closeCh, pp, partID, 
root)
+               require.NoError(t, mergeErr)
+               defer func() {
+                       if p != nil {
+                               p.release()
+                       }
+               }()
+
+               pmi := &partMergeIter{}
+               pmi.mustInitFromPart(p.p)
+               reader := &blockReader{}
+               reader.init([]*partMergeIter{pmi})
+               decoder := generateTagValuesDecoder()
+               defer releaseTagValuesDecoder(decoder)
+
+               var blockCount int
+               for reader.nextBlockMetadata() {
+                       reader.loadBlockData(decoder)
+                       b := &reader.block.block
+                       blockCount++
+
+                       _, hasBareStatus := b.tags["status"]
+                       require.False(t, hasBareStatus,
+                               "block %d (seriesID=%d) has bare 'status' tag; 
expected renamed tags only",
+                               blockCount, reader.block.bm.seriesID)
+
+                       hasStr := false
+                       hasInt := false
+                       for tagName := range b.tags {
+                               if tagName == "status#str" {
+                                       hasStr = true
+                               }
+                               if tagName == "status#int" {
+                                       hasInt = true
+                               }
+                       }
+                       require.True(t, hasStr || hasInt,
+                               "block %d (seriesID=%d) has neither 
'status#str' nor 'status#int'",
+                               blockCount, reader.block.bm.seriesID)
+               }
+               require.NoError(t, reader.error())
+               require.Equal(t, 3, blockCount, "expected 3 blocks (one per 
seriesID)")
+       }
+
+       esList := []*elements{esConflictStr, esConflictInt}
+
+       t.Run("memory parts", func(t *testing.T) {
+               var pp []*partWrapper
+               tmpPath, defFn := test.Space(require.New(t))
+               defer func() {
+                       for _, pw := range pp {
+                               pw.release()
+                       }
+                       defFn()
+               }()
+               for _, es := range esList {
+                       mp := GenerateMemPart()
+                       mp.mustInitFromElements(es)
+                       pp = append(pp, newPartWrapper(mp, openMemPart(mp)))
+               }
+               verify(t, pp, fs.NewLocalFileSystem(), tmpPath, 1)
+       })
+
+       t.Run("file parts", func(t *testing.T) {
+               var fpp []*partWrapper
+               tmpPath, defFn := test.Space(require.New(t))
+               defer func() {
+                       for _, pw := range fpp {
+                               pw.release()
+                       }
+                       defFn()
+               }()
+               fileSystem := fs.NewLocalFileSystem()
+               for i, es := range esList {
+                       mp := GenerateMemPart()
+                       mp.mustInitFromElements(es)
+                       partPath := filepath.Join(tmpPath, 
"part_"+string(rune('0'+i)))
+                       mp.mustFlush(fileSystem, partPath)
+                       filePart := mustOpenPart(uint64(i), partPath, 
fileSystem)
+                       filePW := newPartWrapper(nil, filePart)
+                       fpp = append(fpp, filePW)
+                       ReleaseMemPart(mp)
+               }
+               verify(t, fpp, fileSystem, tmpPath, uint64(len(esList)))
+       })
+}
+
 func elementsToBlocks(esList []*elements) []block {
        merged := generateElements()
        defer releaseElements(merged)
diff --git a/banyand/internal/sidx/query_result.go 
b/banyand/internal/sidx/query_result.go
index c483058fb..e547d16f0 100644
--- a/banyand/internal/sidx/query_result.go
+++ b/banyand/internal/sidx/query_result.go
@@ -114,16 +114,30 @@ func (qr *queryResult) loadBlockData(tmpBlock *block, p 
*part, bm *blockMetadata
        // Load tag data for selected tags only
        for tagName := range tagsToLoad {
                tagBlockInfo, exists := bm.tagsBlocks[tagName]
-               if !exists {
-                       continue // Skip missing tags
+               if exists {
+                       qr.loadTagData(tmpBlock, p, tagName, &tagBlockInfo, 
int(bm.count), decoder)
+                       continue
                }
-
-               if !qr.loadTagData(tmpBlock, p, tagName, &tagBlockInfo, 
int(bm.count), decoder) {
-                       // Continue loading other tags even if one fails
+               schemaType, hasSchemaType := qr.request.SchemaTagTypes[tagName]
+               if !hasSchemaType {
                        continue
                }
+               for typedTag, typedBlock := range bm.tagsBlocks {
+                       if decodeTypedTag(typedTag) != tagName {
+                               continue
+                       }
+                       if valueType(typedTag) != schemaType {
+                               continue
+                       }
+                       if qr.loadTagData(tmpBlock, p, typedTag, &typedBlock, 
int(bm.count), decoder) {
+                               td := tmpBlock.tags[typedTag]
+                               td.name = tagName
+                               tmpBlock.tags[tagName] = td
+                               delete(tmpBlock.tags, typedTag)
+                       }
+                       break
+               }
        }
-
        return len(tmpBlock.userKeys) > 0
 }
 
diff --git a/banyand/internal/sidx/scan_query.go 
b/banyand/internal/sidx/scan_query.go
index 4a063d3a8..c72b3c8f6 100644
--- a/banyand/internal/sidx/scan_query.go
+++ b/banyand/internal/sidx/scan_query.go
@@ -151,16 +151,17 @@ func (s *sidx) scanPart(ctx context.Context, pw 
*partWrapper, req ScanQueryReque
                }
 
                // Create a temporary QueryRequest to reuse existing 
blockCursor infrastructure
-               tmpReq := QueryRequest{
-                       TagFilter:     req.TagFilter,
-                       MinKey:        &minKey,
-                       MaxKey:        &maxKey,
-                       TagProjection: req.TagProjection,
-                       MaxBatchSize:  maxBatchSize,
+               qr := QueryRequest{
+                       TagFilter:      req.TagFilter,
+                       SchemaTagTypes: req.SchemaTagTypes,
+                       MinKey:         &minKey,
+                       MaxKey:         &maxKey,
+                       TagProjection:  req.TagProjection,
+                       MaxBatchSize:   maxBatchSize,
                }
 
                bc := generateBlockCursor()
-               bc.init(p, pi.curBlock, tmpReq)
+               bc.init(p, pi.curBlock, qr)
 
                // Load block data
                tmpBlock := generateBlock()
@@ -184,7 +185,7 @@ func (s *sidx) scanPart(ctx context.Context, pw 
*partWrapper, req ScanQueryReque
                if s.loadBlockCursor(bc, tmpBlock, blockScanResult{
                        p:  p,
                        bm: *pi.curBlock,
-               }, tagsToLoad, tmpReq, s.pm, nil) {
+               }, tagsToLoad, qr, s.pm, nil) {
                        // Copy all rows from this block cursor to current batch
                        for idx := 0; idx < len(bc.userKeys); idx++ {
                                bc.idx = idx
diff --git a/banyand/internal/sidx/tag.go b/banyand/internal/sidx/tag.go
index e1f986060..6686f9c88 100644
--- a/banyand/internal/sidx/tag.go
+++ b/banyand/internal/sidx/tag.go
@@ -19,6 +19,7 @@ package sidx
 
 import (
        "fmt"
+       "strings"
 
        internalencoding 
"github.com/apache/skywalking-banyandb/banyand/internal/encoding"
        "github.com/apache/skywalking-banyandb/pkg/bytes"
@@ -28,6 +29,66 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/pool"
 )
 
+const typedTagSeparator = "#"
+
+var (
+       valueTypeToSuffix = map[pbv1.ValueType]string{
+               pbv1.ValueTypeStr:        "str",
+               pbv1.ValueTypeInt64:      "int",
+               pbv1.ValueTypeFloat64:    "float",
+               pbv1.ValueTypeBinaryData: "bin",
+               pbv1.ValueTypeStrArr:     "str_arr",
+               pbv1.ValueTypeInt64Arr:   "int_arr",
+               pbv1.ValueTypeTimestamp:  "ts",
+               pbv1.ValueTypeUnknown:    "",
+       }
+       suffixToValueType = map[string]pbv1.ValueType{
+               "str":     pbv1.ValueTypeStr,
+               "int":     pbv1.ValueTypeInt64,
+               "float":   pbv1.ValueTypeFloat64,
+               "bin":     pbv1.ValueTypeBinaryData,
+               "str_arr": pbv1.ValueTypeStrArr,
+               "int_arr": pbv1.ValueTypeInt64Arr,
+               "ts":      pbv1.ValueTypeTimestamp,
+       }
+)
+
+func encodeTypedTag(name string, vt pbv1.ValueType) string {
+       suffix, ok := valueTypeToSuffix[vt]
+       if !ok || suffix == "" {
+               return name
+       }
+       return name + typedTagSeparator + suffix
+}
+
+func decodeTypedTag(key string) string {
+       for suffix := range suffixToValueType {
+               sepSuffix := typedTagSeparator + suffix
+               if strings.HasSuffix(key, sepSuffix) {
+                       return key[:len(key)-len(sepSuffix)]
+               }
+       }
+       return key
+}
+
+func hasTypeSuffix(key string) bool {
+       for suffix := range suffixToValueType {
+               if strings.HasSuffix(key, typedTagSeparator+suffix) {
+                       return true
+               }
+       }
+       return false
+}
+
+func valueType(name string) pbv1.ValueType {
+       for suffix, vt := range suffixToValueType {
+               if strings.HasSuffix(name, typedTagSeparator+suffix) {
+                       return vt
+               }
+       }
+       return pbv1.ValueTypeUnknown
+}
+
 // dataBlock represents a reference to data in a file.
 type dataBlock struct {
        offset uint64
@@ -278,18 +339,18 @@ func (tm *tagMetadata) marshal(dst []byte) []byte {
 }
 
 // unmarshal deserializes tag metadata from bytes using encoding package.
-func (tm *tagMetadata) unmarshal(src []byte) ([]byte, error) {
+func (tm *tagMetadata) unmarshal(src []byte) error {
        var nameBytes []byte
        var err error
 
        src, nameBytes, err = pkgencoding.DecodeBytes(src)
        if err != nil {
-               return nil, fmt.Errorf("cannot unmarshal tagMetadata.name: %w", 
err)
+               return fmt.Errorf("cannot unmarshal tagMetadata.name: %w", err)
        }
        tm.name = string(nameBytes)
 
        if len(src) < 1 {
-               return nil, fmt.Errorf("cannot unmarshal tagMetadata.valueType: 
src is too short")
+               return fmt.Errorf("cannot unmarshal tagMetadata.valueType: src 
is too short")
        }
        tm.valueType = pbv1.ValueType(src[0])
        src = src[1:]
@@ -300,26 +361,26 @@ func (tm *tagMetadata) unmarshal(src []byte) ([]byte, 
error) {
        src, tm.filterBlock.size = pkgencoding.BytesToVarUint64(src)
 
        if len(src) < 1 {
-               return nil, fmt.Errorf("cannot unmarshal tagMetadata flags: src 
is too short")
+               return fmt.Errorf("cannot unmarshal tagMetadata flags: src is 
too short")
        }
 
        src, tm.min, err = pkgencoding.DecodeBytes(src)
        if err != nil {
-               return nil, fmt.Errorf("cannot unmarshal tagMetadata.min: %w", 
err)
+               return fmt.Errorf("cannot unmarshal tagMetadata.min: %w", err)
        }
        if len(tm.min) == 0 {
                tm.min = nil
        }
 
-       src, tm.max, err = pkgencoding.DecodeBytes(src)
+       _, tm.max, err = pkgencoding.DecodeBytes(src)
        if err != nil {
-               return nil, fmt.Errorf("cannot unmarshal tagMetadata.max: %w", 
err)
+               return fmt.Errorf("cannot unmarshal tagMetadata.max: %w", err)
        }
        if len(tm.max) == 0 {
                tm.max = nil
        }
 
-       return src, nil
+       return nil
 }
 
 // marshalAppend serializes tagMetadata to bytes and appends to dst (panic 
version for mustWriteTag).
@@ -330,7 +391,7 @@ func (tm *tagMetadata) marshalAppend(dst []byte) []byte {
 // unmarshalTagMetadata deserializes tag metadata from bytes.
 func unmarshalTagMetadata(data []byte) (*tagMetadata, error) {
        tm := generateTagMetadata()
-       _, err := tm.unmarshal(data)
+       err := tm.unmarshal(data)
        if err != nil {
                releaseTagMetadata(tm)
                return nil, err
diff --git a/banyand/trace/block.go b/banyand/trace/block.go
index b4e05733b..0db54fdd1 100644
--- a/banyand/trace/block.go
+++ b/banyand/trace/block.go
@@ -465,7 +465,8 @@ func (bc *blockCursor) copyAllTo(r *model.TraceResult) {
        }
        for i, t := range bc.tags {
                values := make([]*modelv1.TagValue, len(bc.spans))
-               schemaType, hasSchemaType := bc.schemaTagTypes[t.name]
+               name := decodeTypedTag(t.name)
+               schemaType, hasSchemaType := bc.schemaTagTypes[name]
                for k := range bc.spans {
                        if len(t.values) > k {
                                if hasSchemaType && t.valueType == schemaType {
@@ -485,18 +486,32 @@ func (bc *blockCursor) loadData(tmpBlock *block) bool {
        tmpBlock.reset()
        bc.bm.tagProjection = bc.tagProjection
        var t map[string]*dataBlock
-       for _, name := range bc.tagProjection.Names {
-               for tagName, block := range bc.bm.tags {
-                       if tagName == name {
-                               if t == nil {
-                                       t = make(map[string]*dataBlock, 
len(bc.tagProjection.Names))
+       projectionNames := make([]string, len(bc.tagProjection.Names))
+       copy(projectionNames, bc.tagProjection.Names)
+       for i, name := range bc.tagProjection.Names {
+               for typedTag, block := range bc.bm.tags {
+                       if decodeTypedTag(typedTag) != name {
+                               continue
+                       }
+                       if hasTypeSuffix(typedTag) {
+                               schemaType, ok := bc.schemaTagTypes[name]
+                               if !ok {
+                                       continue
+                               }
+                               if bmType, ok := bc.bm.tagType[typedTag]; !ok 
|| bmType != schemaType {
+                                       continue
                                }
-                               t[name] = block
+                               projectionNames[i] = typedTag
+                       }
+                       if t == nil {
+                               t = make(map[string]*dataBlock, 
len(bc.tagProjection.Names))
                        }
+                       t[typedTag] = block
                }
        }
 
        bc.bm.tags = t
+       bc.bm.tagProjection = &model.TagProjection{Names: projectionNames}
        tmpBlock.mustReadFrom(&bc.tagValuesDecoder, bc.p, bc.bm)
        if len(tmpBlock.spans) == 0 {
                return false
@@ -591,6 +606,8 @@ func fastTagAppend(bi, b *blockPointer, offset int) error {
                        return fmt.Errorf("unexpected tag name for tag %q: got 
%q; want %q",
                                bi.tags[i].name, b.tags[i].name, 
bi.tags[i].name)
                }
+       }
+       for i := range bi.tags {
                assertIdxAndOffset(b.tags[i].name, len(b.tags[i].values), 
b.idx, offset)
                bi.tags[i].values = append(bi.tags[i].values, 
b.tags[i].values[b.idx:offset]...)
        }
diff --git a/banyand/trace/merger.go b/banyand/trace/merger.go
index d4286fd9d..0ee1c05d1 100644
--- a/banyand/trace/merger.go
+++ b/banyand/trace/merger.go
@@ -30,6 +30,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/cgroups"
        "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/fs"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/watcher"
 )
 
@@ -320,6 +321,7 @@ func (tst *tsTable) mergeParts(fileSystem fs.FileSystem, 
closeCh <-chan struct{}
        br.init(pii)
        bw := generateBlockWriter()
        bw.mustInitForFilePart(fileSystem, dstPath, shouldCache, int(traceSize))
+       conflictTags := collectConflictTags(parts)
 
        var minTimestamp, maxTimestamp int64
        for i, pw := range parts {
@@ -337,7 +339,7 @@ func (tst *tsTable) mergeParts(fileSystem fs.FileSystem, 
closeCh <-chan struct{}
                }
        }
 
-       pm, tf, tt, err := mergeBlocks(closeCh, bw, br)
+       pm, tf, tt, err := mergeBlocks(closeCh, bw, br, conflictTags)
        releaseBlockWriter(bw)
        releaseBlockReader(br)
        for i := range pii {
@@ -364,7 +366,30 @@ var errClosed = fmt.Errorf("the merger is closed")
 // forceSlowMerge is used for testing to disable the fast raw merge path.
 var forceSlowMerge = false
 
-func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter, br *blockReader) 
(*partMetadata, *traceIDFilter, *tagType, error) {
+func collectConflictTags(parts []*partWrapper) map[string]struct{} {
+       tagTypes := make(map[string]map[pbv1.ValueType]struct{})
+       for _, pw := range parts {
+               for tag, vt := range pw.p.tagType {
+                       t := decodeTypedTag(tag)
+                       if tagTypes[t] == nil {
+                               tagTypes[t] = make(map[pbv1.ValueType]struct{})
+                       }
+                       tagTypes[t][vt] = struct{}{}
+               }
+       }
+       var result map[string]struct{}
+       for tag, types := range tagTypes {
+               if len(types) > 1 {
+                       if result == nil {
+                               result = make(map[string]struct{})
+                       }
+                       result[tag] = struct{}{}
+               }
+       }
+       return result
+}
+
+func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter, br *blockReader, 
conflictTags map[string]struct{}) (*partMetadata, *traceIDFilter, *tagType, 
error) {
        pendingBlockIsEmpty := true
        pendingBlock := generateBlockPointer()
        defer releaseBlockPointer(pendingBlock)
@@ -383,6 +408,14 @@ func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter, 
br *blockReader) (*pa
                        decoder = nil
                }
        }
+       loadAndRename := func() {
+               br.loadBlockData(getDecoder())
+               renameConflictTags(&br.block.block, conflictTags)
+       }
+       readAndRename := func(bm *blockMetadata) {
+               br.mustReadRaw(&rawBlk, bm)
+               renameRawConflictTags(&rawBlk, conflictTags)
+       }
        for br.nextBlockMetadata() {
                select {
                case <-closeCh:
@@ -395,13 +428,13 @@ func mergeBlocks(closeCh <-chan struct{}, bw 
*blockWriter, br *blockReader) (*pa
                nextB := br.peek()
                if !forceSlowMerge && pendingBlockIsEmpty && (nextB == nil || 
nextB.bm.traceID != b.bm.traceID) {
                        // fast path: only a single block for the trace id and 
no pending data
-                       br.mustReadRaw(&rawBlk, &b.bm)
+                       readAndRename(&b.bm)
                        bw.mustWriteRawBlock(&rawBlk)
                        continue
                }
 
                if pendingBlockIsEmpty {
-                       br.loadBlockData(getDecoder())
+                       loadAndRename()
                        pendingBlock.copyFrom(b)
                        pendingBlockIsEmpty = false
                        continue
@@ -416,12 +449,12 @@ func mergeBlocks(closeCh <-chan struct{}, bw 
*blockWriter, br *blockReader) (*pa
                        nextB = br.peek()
                        if !forceSlowMerge && (nextB == nil || nextB.bm.traceID 
!= b.bm.traceID) {
                                // fast path: only a single block for this new 
trace id
-                               br.mustReadRaw(&rawBlk, &b.bm)
+                               readAndRename(&b.bm)
                                bw.mustWriteRawBlock(&rawBlk)
                                continue
                        }
                        // Slow path: start accumulating the new block
-                       br.loadBlockData(getDecoder())
+                       loadAndRename()
                        pendingBlock.copyFrom(b)
                        pendingBlockIsEmpty = false
                        continue
@@ -433,7 +466,7 @@ func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter, 
br *blockReader) (*pa
                }
                tmpBlock.reset()
                tmpBlock.bm.traceID = b.bm.traceID
-               br.loadBlockData(getDecoder())
+               loadAndRename()
                mergeTwoBlocks(tmpBlock, pendingBlock, b)
                if tmpBlock.block.spanSize() <= maxUncompressedSpanSize {
                        if len(tmpBlock.spans) == 0 {
@@ -466,3 +499,40 @@ func mergeTwoBlocks(target, left, right *blockPointer) {
        target.appendAll(left)
        target.appendAll(right)
 }
+
+func renameConflictTags(b *block, conflictTags map[string]struct{}) {
+       if len(conflictTags) == 0 {
+               return
+       }
+       for i := range b.tags {
+               if _, ok := conflictTags[b.tags[i].name]; ok {
+                       b.tags[i].name = encodeTypedTag(b.tags[i].name, 
b.tags[i].valueType)
+               }
+       }
+}
+
+func renameRawConflictTags(r *rawBlock, conflictTags map[string]struct{}) {
+       if len(conflictTags) == 0 {
+               return
+       }
+       bm := r.bm
+       for tag := range conflictTags {
+               if _, ok := bm.tags[tag]; !ok {
+                       continue
+               }
+               valueType := bm.tagType[tag]
+               typedTag := encodeTypedTag(tag, valueType)
+               bm.tags[typedTag] = bm.tags[tag]
+               delete(bm.tags, tag)
+               bm.tagType[typedTag] = valueType
+               delete(bm.tagType, tag)
+               if rawData, ok := r.tags[tag]; ok {
+                       r.tags[typedTag] = rawData
+                       delete(r.tags, tag)
+               }
+               if rawMeta, ok := r.tagMetadata[tag]; ok {
+                       r.tagMetadata[typedTag] = rawMeta
+                       delete(r.tagMetadata, tag)
+               }
+       }
+}
diff --git a/banyand/trace/merger_bench_test.go 
b/banyand/trace/merger_bench_test.go
index fb2aae7f1..601644c21 100644
--- a/banyand/trace/merger_bench_test.go
+++ b/banyand/trace/merger_bench_test.go
@@ -119,7 +119,7 @@ func Benchmark_mergeBlocks_FastRawMerge(b *testing.B) {
 
                                closeCh := make(chan struct{})
 
-                               _, _, _, err := mergeBlocks(closeCh, bw, br)
+                               _, _, _, err := mergeBlocks(closeCh, bw, br, 
nil)
                                if err != nil {
                                        b.Fatal(err)
                                }
@@ -226,7 +226,7 @@ func Benchmark_mergeBlocks_OriginalMerge(b *testing.B) {
 
                                closeCh := make(chan struct{})
 
-                               _, _, _, err := mergeBlocks(closeCh, bw, br)
+                               _, _, _, err := mergeBlocks(closeCh, bw, br, 
nil)
                                if err != nil {
                                        b.Fatal(err)
                                }
@@ -331,7 +331,7 @@ func Benchmark_mergeBlocks_Comparison(b *testing.B) {
 
                        closeCh := make(chan struct{})
 
-                       _, _, _, err := mergeBlocks(closeCh, bw, br)
+                       _, _, _, err := mergeBlocks(closeCh, bw, br, nil)
                        if err != nil {
                                b.Fatal(err)
                        }
@@ -377,7 +377,7 @@ func Benchmark_mergeBlocks_Comparison(b *testing.B) {
 
                        closeCh := make(chan struct{})
 
-                       _, _, _, err := mergeBlocks(closeCh, bw, br)
+                       _, _, _, err := mergeBlocks(closeCh, bw, br, nil)
                        if err != nil {
                                b.Fatal(err)
                        }
diff --git a/banyand/trace/merger_test.go b/banyand/trace/merger_test.go
index f162b400e..e2c5c4e9f 100644
--- a/banyand/trace/merger_test.go
+++ b/banyand/trace/merger_test.go
@@ -269,7 +269,7 @@ func Test_mergeBlocks_fastPath(t *testing.T) {
                        closeCh := make(chan struct{})
                        defer close(closeCh)
 
-                       pm, tf, tagTypes, err := mergeBlocks(closeCh, bw, br)
+                       pm, tf, tagTypes, err := mergeBlocks(closeCh, bw, br, 
nil)
                        require.NoError(t, err)
                        require.NotNil(t, pm)
                        require.NotNil(t, tf)
@@ -1456,3 +1456,155 @@ func Test_flush_cleansUpOnSidxFlushError(t *testing.T) {
        sidxPartPath := filepath.Join(tmpPath, sidxDirName, "idx1", 
fmt.Sprintf("%016x", partID))
        require.False(t, fileSystem.IsExist(sidxPartPath), "sidx part directory 
should be removed on sidx flush failure")
 }
+
+func Test_mergePartsWithConflictingTagTypes(t *testing.T) {
+       tests := []struct {
+               name         string
+               parts        []*traces
+               wantTagTypes map[string]pbv1.ValueType
+               wantDeleted  []string
+       }{
+               {
+                       name: "multiple parts with int and str for same tag",
+                       parts: []*traces{
+                               {
+                                       traceIDs:   []string{"trace1"},
+                                       timestamps: []int64{1},
+                                       tags: [][]*tagValue{
+                                               {
+                                                       {tag: "state", 
valueType: pbv1.ValueTypeInt64, value: convert.Int64ToBytes(100)},
+                                                       {tag: "name", 
valueType: pbv1.ValueTypeStr, value: []byte("svc1")},
+                                               },
+                                       },
+                                       spans:   [][]byte{[]byte("span1")},
+                                       spanIDs: []string{"span1"},
+                               },
+                               {
+                                       traceIDs:   []string{"trace2"},
+                                       timestamps: []int64{2},
+                                       tags: [][]*tagValue{
+                                               {
+                                                       {tag: "state", 
valueType: pbv1.ValueTypeStr, value: []byte("active")},
+                                                       {tag: "name", 
valueType: pbv1.ValueTypeStr, value: []byte("svc2")},
+                                               },
+                                       },
+                                       spans:   [][]byte{[]byte("span2")},
+                                       spanIDs: []string{"span2"},
+                               },
+                       },
+                       wantTagTypes: map[string]pbv1.ValueType{
+                               "state#int": pbv1.ValueTypeInt64,
+                               "state#str": pbv1.ValueTypeStr,
+                               "name":      pbv1.ValueTypeStr,
+                       },
+                       wantDeleted: []string{"state"},
+               },
+               {
+                       name: "merge typed-suffix part with plain part",
+                       parts: []*traces{
+                               {
+                                       traceIDs:   []string{"trace1"},
+                                       timestamps: []int64{1},
+                                       tags: [][]*tagValue{
+                                               {{tag: "state#int", valueType: 
pbv1.ValueTypeInt64, value: convert.Int64ToBytes(100)}},
+                                       },
+                                       spans:   [][]byte{[]byte("span1")},
+                                       spanIDs: []string{"span1"},
+                               },
+                               {
+                                       traceIDs:   []string{"trace2"},
+                                       timestamps: []int64{2},
+                                       tags: [][]*tagValue{
+                                               {{tag: "state", valueType: 
pbv1.ValueTypeStr, value: []byte("pending")}},
+                                       },
+                                       spans:   [][]byte{[]byte("span2")},
+                                       spanIDs: []string{"span2"},
+                               },
+                       },
+                       wantTagTypes: map[string]pbv1.ValueType{
+                               "state#int": pbv1.ValueTypeInt64,
+                               "state#str": pbv1.ValueTypeStr,
+                       },
+                       wantDeleted: []string{"state"},
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       tmpPath, defFn := test.Space(require.New(t))
+                       defer defFn()
+                       fileSystem := fs.NewLocalFileSystem()
+
+                       partIDs := make([]uint64, len(tt.parts))
+                       for idx := range partIDs {
+                               partIDs[idx] = uint64(idx)
+                       }
+                       dstID := uint64(999)
+                       dstPath := mergePartsWithConflictTags(t, fileSystem, 
tmpPath, tt.parts, partIDs, dstID)
+
+                       mergedPart := mustOpenFilePart(dstID, tmpPath, 
fileSystem)
+                       for tag, wantType := range tt.wantTagTypes {
+                               gotType, exists := mergedPart.tagType[tag]
+                               require.True(t, exists, "expected tag type 
entry for %q", tag)
+                               require.Equal(t, wantType, gotType, "tag type 
mismatch for %q", tag)
+                       }
+                       for _, tag := range tt.wantDeleted {
+                               _, ok := mergedPart.tagType[tag]
+                               require.False(t, ok, "tag %q should not exist 
in tagType after merge", tag)
+                               dataFile := filepath.Join(dstPath, 
tag+tagsFilenameExt)
+                               require.False(t, fileSystem.IsExist(dataFile), 
"tag data file %q should not exist", dataFile)
+                               metaFile := filepath.Join(dstPath, 
tag+tagsMetadataFilenameExt)
+                               require.False(t, fileSystem.IsExist(metaFile), 
"tag metadata file %q should not exist", metaFile)
+                       }
+               })
+       }
+}
+
+func mergePartsWithConflictTags(t *testing.T, fileSystem fs.FileSystem, 
tmpPath string, tsList []*traces, partIDs []uint64, dstID uint64) string {
+       t.Helper()
+       var parts []*partWrapper
+       for i, traces := range tsList {
+               mp := generateMemPart()
+               mp.mustInitFromTraces(traces)
+               mp.mustFlush(fileSystem, partPath(tmpPath, partIDs[i]))
+               p := mustOpenFilePart(partIDs[i], tmpPath, fileSystem)
+               parts = append(parts, &partWrapper{p: p})
+               releaseMemPart(mp)
+       }
+
+       conflictTags := collectConflictTags(parts)
+
+       var pmi []*partMergeIter
+       var traceSize uint64
+       for i := range tsList {
+               p := mustOpenFilePart(partIDs[i], tmpPath, fileSystem)
+               iter := generatePartMergeIter()
+               iter.mustInitFromPart(p)
+               pmi = append(pmi, iter)
+               traceSize += uint64(len(tsList[i].traceIDs))
+       }
+
+       br := generateBlockReader()
+       br.init(pmi)
+       bw := generateBlockWriter()
+       dstPath := partPath(tmpPath, dstID)
+       bw.mustInitForFilePart(fileSystem, dstPath, false, int(traceSize))
+
+       closeCh := make(chan struct{})
+       pm, tf, tagTypes, mergeErr := mergeBlocks(closeCh, bw, br, conflictTags)
+       close(closeCh)
+       require.NoError(t, mergeErr)
+       require.NotNil(t, pm)
+
+       releaseBlockWriter(bw)
+       releaseBlockReader(br)
+       for _, iter := range pmi {
+               releasePartMergeIter(iter)
+       }
+
+       pm.mustWriteMetadata(fileSystem, dstPath)
+       tf.mustWriteTraceIDFilter(fileSystem, dstPath)
+       tagTypes.mustWriteTagType(fileSystem, dstPath)
+       fileSystem.SyncPath(dstPath)
+       return dstPath
+}
diff --git a/banyand/trace/query.go b/banyand/trace/query.go
index 801b70872..7845d0eb7 100644
--- a/banyand/trace/query.go
+++ b/banyand/trace/query.go
@@ -252,13 +252,14 @@ func (t *trace) prepareSIDXStreaming(
        }
 
        req := sidx.QueryRequest{
-               Filter:       tqo.SkippingFilter,
-               TagFilter:    tqo.TagFilter,
-               Order:        tqo.Order,
-               MaxBatchSize: tqo.MaxTraceSize,
-               MinKey:       &tqo.MinVal,
-               MaxKey:       &tqo.MaxVal,
-               SeriesIDs:    seriesIDs,
+               Filter:         tqo.SkippingFilter,
+               TagFilter:      tqo.TagFilter,
+               Order:          tqo.Order,
+               MaxBatchSize:   tqo.MaxTraceSize,
+               MinKey:         &tqo.MinVal,
+               MaxKey:         &tqo.MaxVal,
+               SeriesIDs:      seriesIDs,
+               SchemaTagTypes: qo.schemaTagTypes,
        }
        if tqo.TagProjection != nil {
                req.TagProjection = []model.TagProjection{*tqo.TagProjection}
diff --git a/banyand/trace/svc_standalone.go b/banyand/trace/svc_standalone.go
index 77dd2f873..13f4e94e4 100644
--- a/banyand/trace/svc_standalone.go
+++ b/banyand/trace/svc_standalone.go
@@ -95,6 +95,7 @@ func (s *standalone) FlagSet() *run.FlagSet {
        fs.IntVar(&s.maxFileSnapshotNum, "trace-max-file-snapshot-num", 10, 
"the maximum number of file snapshots")
        fs.DurationVar(&s.minFileSnapshotAge, "trace-min-file-snapshot-age", 
time.Hour, "minimum age for file snapshots to be eligible for deletion")
        s.option.mergePolicy = newDefaultMergePolicy()
+       fs.IntVar(&s.option.mergePolicy.maxParts, "trace-max-merge-parts", 
s.option.mergePolicy.maxParts, "the maximum number of parts to merge at once")
        fs.VarP(&s.option.mergePolicy.maxFanOutSize, "trace-max-fan-out-size", 
"", "the upper bound of a single file size after merge of trace")
        // Additional flags can be added here
        return fs
diff --git a/banyand/trace/tag.go b/banyand/trace/tag.go
index 647358da7..31c612f9b 100644
--- a/banyand/trace/tag.go
+++ b/banyand/trace/tag.go
@@ -18,6 +18,8 @@
 package trace
 
 import (
+       "strings"
+
        internalencoding 
"github.com/apache/skywalking-banyandb/banyand/internal/encoding"
        "github.com/apache/skywalking-banyandb/pkg/bytes"
        pkgencoding "github.com/apache/skywalking-banyandb/pkg/encoding"
@@ -26,6 +28,57 @@ import (
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
 )
 
+const typedTagSeparator = "#"
+
+var (
+       valueTypeToSuffix = map[pbv1.ValueType]string{
+               pbv1.ValueTypeStr:        "str",
+               pbv1.ValueTypeInt64:      "int",
+               pbv1.ValueTypeFloat64:    "float",
+               pbv1.ValueTypeBinaryData: "bin",
+               pbv1.ValueTypeStrArr:     "str_arr",
+               pbv1.ValueTypeInt64Arr:   "int_arr",
+               pbv1.ValueTypeTimestamp:  "ts",
+               pbv1.ValueTypeUnknown:    "",
+       }
+       suffixToValueType = map[string]pbv1.ValueType{
+               "str":     pbv1.ValueTypeStr,
+               "int":     pbv1.ValueTypeInt64,
+               "float":   pbv1.ValueTypeFloat64,
+               "bin":     pbv1.ValueTypeBinaryData,
+               "str_arr": pbv1.ValueTypeStrArr,
+               "int_arr": pbv1.ValueTypeInt64Arr,
+               "ts":      pbv1.ValueTypeTimestamp,
+       }
+)
+
+func encodeTypedTag(name string, vt pbv1.ValueType) string {
+       suffix, ok := valueTypeToSuffix[vt]
+       if !ok || suffix == "" {
+               return name
+       }
+       return name + typedTagSeparator + suffix
+}
+
+func decodeTypedTag(key string) string {
+       for suffix := range suffixToValueType {
+               sepSuffix := typedTagSeparator + suffix
+               if strings.HasSuffix(key, sepSuffix) {
+                       return key[:len(key)-len(sepSuffix)]
+               }
+       }
+       return key
+}
+
+func hasTypeSuffix(key string) bool {
+       for suffix := range suffixToValueType {
+               if strings.HasSuffix(key, typedTagSeparator+suffix) {
+                       return true
+               }
+       }
+       return false
+}
+
 type tag struct {
        name      string
        values    [][]byte
diff --git a/banyand/trace/tag_test.go b/banyand/trace/tag_test.go
index 2ac463d74..fce0a0233 100644
--- a/banyand/trace/tag_test.go
+++ b/banyand/trace/tag_test.go
@@ -34,6 +34,25 @@ func timeToBytes(t time.Time) []byte {
        return pkgencoding.Int64ToBytes(nil, t.UnixNano())
 }
 
+func TestEncodeTypedTag(t *testing.T) {
+       assert.Equal(t, "state#int", encodeTypedTag("state", 
pbv1.ValueTypeInt64))
+       assert.Equal(t, "state#str", encodeTypedTag("state", pbv1.ValueTypeStr))
+       assert.Equal(t, "http.status#int", encodeTypedTag("http.status", 
pbv1.ValueTypeInt64))
+       assert.Equal(t, "data#bin", encodeTypedTag("data", 
pbv1.ValueTypeBinaryData))
+       assert.Equal(t, "unknown", encodeTypedTag("unknown", 
pbv1.ValueTypeUnknown))
+}
+
+func TestDecodeTypedTag(t *testing.T) {
+       assert.Equal(t, "state", decodeTypedTag("state#int"))
+       assert.Equal(t, "state", decodeTypedTag("state#str"))
+       assert.Equal(t, "http.status", decodeTypedTag("http.status#int"))
+       assert.Equal(t, "http.status", decodeTypedTag("http.status"))
+       assert.Equal(t, "simple", decodeTypedTag("simple"))
+       assert.Equal(t, "my.str", decodeTypedTag("my.str"))
+       assert.Equal(t, "arr.int", decodeTypedTag("arr.int"))
+       assert.Equal(t, "data.bin", decodeTypedTag("data.bin"))
+}
+
 func TestTagEncodingDecoding(t *testing.T) {
        t.Run("test int64 tag encoding/decoding", func(t *testing.T) {
                tag := &tag{
diff --git a/banyand/trace/trace_suite_test.go 
b/banyand/trace/trace_suite_test.go
index daefee2bd..9e69e4790 100644
--- a/banyand/trace/trace_suite_test.go
+++ b/banyand/trace/trace_suite_test.go
@@ -90,7 +90,7 @@ func setUp() (*services, func()) {
        flags = append(flags, "--metadata-root-path="+metaPath)
        rootPath, deferFunc, err := test.NewSpace()
        gomega.Expect(err).NotTo(gomega.HaveOccurred())
-       flags = append(flags, "--trace-root-path="+rootPath)
+       flags = append(flags, "--trace-root-path="+rootPath, 
"--trace-max-merge-parts=2")
        listenClientURL, listenPeerURL, err := test.NewEtcdListenUrls()
        gomega.Expect(err).NotTo(gomega.HaveOccurred())
        flags = append(flags, "--etcd-listen-client-url="+listenClientURL, 
"--etcd-listen-peer-url="+listenPeerURL,


Reply via email to