This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new af494b13 Add integration test for trace (#741)
af494b13 is described below

commit af494b13bd2c406894d1a23735ad3e0388cb9779
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Thu Aug 28 23:09:21 2025 +0800

    Add integration test for trace (#741)
---
 .golangci.yml                                      |   2 +
 AI_CODING_GUIDELINES.md                            |   1 +
 banyand/internal/encoding/tag_encoder.go           | 142 +++-----
 banyand/internal/encoding/tag_encoder_test.go      |  42 ++-
 banyand/internal/sidx/block.go                     |   7 +-
 banyand/internal/sidx/part.go                      |   7 +-
 banyand/internal/sidx/query_result.go              |  14 +-
 banyand/internal/sidx/tag.go                       |   9 -
 banyand/liaison/grpc/discovery.go                  |  90 +++--
 banyand/liaison/grpc/server.go                     |  14 +-
 banyand/liaison/grpc/trace.go                      | 370 +++++++++++++++++++++
 banyand/liaison/http/server.go                     |   2 +
 banyand/stream/tag.go                              |  37 ++-
 banyand/trace/block.go                             |   4 +
 banyand/trace/introducer.go                        |   8 +-
 banyand/trace/metadata.go                          | 149 ++++++++-
 banyand/trace/part.go                              |   4 +
 banyand/trace/query.go                             |  30 +-
 banyand/trace/svc_standalone.go                    |  99 ++----
 banyand/trace/tag.go                               | 306 ++---------------
 banyand/trace/tag_test.go                          | 347 ++++++++-----------
 banyand/trace/timestamp_test.go                    | 123 +++++++
 banyand/trace/trace.go                             |  11 +-
 banyand/trace/trace_suite_test.go                  |   4 +-
 banyand/trace/traces.go                            |   2 +-
 banyand/trace/write_standalone.go                  |  10 +-
 pkg/cmdsetup/standalone.go                         |   6 +
 pkg/pb/v1/value.go                                 |  42 +++
 pkg/pb/v1/value_test.go                            |   9 +
 pkg/schema/init.go                                 |  39 ++-
 pkg/test/setup/setup.go                            |   9 +
 pkg/test/trace/etcd.go                             | 109 ++++++
 .../trace/testdata/groups/test-trace-group.json    |  19 ++
 .../trace/testdata/index_rule_bindings/sw.json     |  17 +
 pkg/test/trace/testdata/index_rules/duration.json  |  14 +
 pkg/test/trace/testdata/index_rules/timestamp.json |  14 +
 pkg/test/trace/testdata/traces/sw.json             |  39 +++
 test/cases/init.go                                 |   4 +
 test/cases/trace/data/data.go                      | 206 ++++++++++++
 test/cases/trace/data/input/all.yml                |  20 ++
 test/cases/trace/data/testdata/sw.json             | 177 ++++++++++
 test/cases/trace/data/want/all.yml                 |  48 +++
 test/cases/trace/trace.go                          |  46 +++
 .../standalone/query/query_suite_test.go           |   4 +
 44 files changed, 1900 insertions(+), 756 deletions(-)

diff --git a/.golangci.yml b/.golangci.yml
index 6fad3c15..be8bc5de 100644
--- a/.golangci.yml
+++ b/.golangci.yml
@@ -69,6 +69,8 @@ linters-settings:
       alias: streamv1
     - pkg: github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1
       alias: clusterv1
+    - pkg: github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1
+      alias: tracev1
     - pkg: github.com/apache/skywalking-banyandb/pkg/pb/v1
       alias: pbv1
   lll:
diff --git a/AI_CODING_GUIDELINES.md b/AI_CODING_GUIDELINES.md
index 2f5b9c62..c02cdce8 100644
--- a/AI_CODING_GUIDELINES.md
+++ b/AI_CODING_GUIDELINES.md
@@ -49,6 +49,7 @@ Use these specific aliases for protobuf packages:
 - github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1 → 
measurev1
 - github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1 → streamv1
 - github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1 → 
clusterv1
+- github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1 → tracev1
 - github.com/apache/skywalking-banyandb/pkg/pb/v1 → pbv1
 
 ## ERROR HANDLING PATTERNS
diff --git a/banyand/internal/encoding/tag_encoder.go 
b/banyand/internal/encoding/tag_encoder.go
index 7859dd1d..78bcf0bf 100644
--- a/banyand/internal/encoding/tag_encoder.go
+++ b/banyand/internal/encoding/tag_encoder.go
@@ -22,7 +22,6 @@ package encoding
 
 import (
        "github.com/apache/skywalking-banyandb/pkg/bytes"
-       "github.com/apache/skywalking-banyandb/pkg/compress/zstd"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/logger"
@@ -30,15 +29,10 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/pool"
 )
 
-const (
-       defaultCompressionLevel = 3
-)
-
 var (
        int64SlicePool   = pool.Register[*[]int64]("tag-encoder-int64Slice")
        float64SlicePool = pool.Register[*[]float64]("tag-encoder-float64Slice")
        dictionaryPool   = 
pool.Register[*encoding.Dictionary]("tag-encoder-dictionary")
-       bigValuePool     = bytes.NewBufferPool("tag-encoder-big-value")
 )
 
 func generateInt64Slice(length int) *[]int64 {
@@ -96,14 +90,11 @@ func releaseDictionary(d *encoding.Dictionary) {
 // For int64: uses delta encoding with first value storage.
 // For float64: converts to decimal integers with exponent, then delta 
encoding.
 // For other types: uses dictionary encoding, falls back to plain with zstd 
compression.
-func EncodeTagValues(values [][]byte, valueType pbv1.ValueType) ([]byte, 
error) {
+func EncodeTagValues(bb *bytes.Buffer, values [][]byte, valueType 
pbv1.ValueType) error {
        if len(values) == 0 {
-               return nil, nil
+               return nil
        }
 
-       bb := bigValuePool.Generate()
-       defer bigValuePool.Release(bb)
-
        switch valueType {
        case pbv1.ValueTypeInt64:
                return encodeInt64TagValues(bb, values)
@@ -115,29 +106,22 @@ func EncodeTagValues(values [][]byte, valueType 
pbv1.ValueType) ([]byte, error)
 }
 
 // DecodeTagValues decodes tag values based on the value type.
-func DecodeTagValues(data []byte, valueType pbv1.ValueType, count int) 
([][]byte, error) {
-       if len(data) == 0 {
+func DecodeTagValues(dst [][]byte, decoder *encoding.BytesBlockDecoder, bb 
*bytes.Buffer, valueType pbv1.ValueType, count int) ([][]byte, error) {
+       if len(bb.Buf) == 0 {
                return nil, nil
        }
 
-       bb := bigValuePool.Generate()
-       defer bigValuePool.Release(bb)
-       bb.Buf = append(bb.Buf[:0], data...)
-
-       decoder := &encoding.BytesBlockDecoder{}
-       values := make([][]byte, 0, count)
-
        switch valueType {
        case pbv1.ValueTypeInt64:
-               return decodeInt64TagValues(decoder, bb, uint64(count))
+               return decodeInt64TagValues(dst, decoder, bb, uint64(count))
        case pbv1.ValueTypeFloat64:
-               return decodeFloat64TagValues(decoder, bb, uint64(count))
+               return decodeFloat64TagValues(dst, decoder, bb, uint64(count))
        default:
-               return decodeDefaultTagValues(decoder, bb, uint64(count), 
values)
+               return decodeDefaultTagValues(dst, decoder, bb, uint64(count))
        }
 }
 
-func encodeInt64TagValues(bb *bytes.Buffer, values [][]byte) ([]byte, error) {
+func encodeInt64TagValues(bb *bytes.Buffer, values [][]byte) error {
        intValuesPtr := generateInt64Slice(len(values))
        intValues := *intValuesPtr
        defer releaseInt64Slice(intValuesPtr)
@@ -147,11 +131,9 @@ func encodeInt64TagValues(bb *bytes.Buffer, values 
[][]byte) ([]byte, error) {
                if v == nil || string(v) == "null" {
                        // Handle null values by falling back to default 
encoding
                        bb.Buf = encoding.EncodeBytesBlock(bb.Buf[:0], values)
-                       // Apply zstd compression for plain encoding
-                       compressed := zstd.Compress(nil, bb.Buf, 
defaultCompressionLevel)
                        // Prepend EncodeTypePlain at the head of compressed 
data
-                       result := 
append([]byte{byte(encoding.EncodeTypePlain)}, compressed...)
-                       return result, nil
+                       bb.Buf = append([]byte{byte(encoding.EncodeTypePlain)}, 
bb.Buf...)
+                       return nil
                }
                if len(v) != 8 {
                        logger.Panicf("invalid value length at index %d: 
expected 8 bytes, got %d", i, len(v))
@@ -167,14 +149,14 @@ func encodeInt64TagValues(bb *bytes.Buffer, values 
[][]byte) ([]byte, error) {
        firstValueBytes := convert.Int64ToBytes(firstValue)
 
        // Prepend encodeType (1 byte) and firstValue (8 bytes) to the beginning
-       result := append(
+       bb.Buf = append(
                append([]byte{byte(encodeType)}, firstValueBytes...),
                bb.Buf...,
        )
-       return result, nil
+       return nil
 }
 
-func encodeFloat64TagValues(bb *bytes.Buffer, values [][]byte) ([]byte, error) 
{
+func encodeFloat64TagValues(bb *bytes.Buffer, values [][]byte) error {
        intValuesPtr := generateInt64Slice(len(values))
        intValues := *intValuesPtr
        defer releaseInt64Slice(intValuesPtr)
@@ -189,11 +171,9 @@ func encodeFloat64TagValues(bb *bytes.Buffer, values 
[][]byte) ([]byte, error) {
                if v == nil || string(v) == "null" {
                        // Handle null values by falling back to default 
encoding
                        bb.Buf = encoding.EncodeBytesBlock(bb.Buf[:0], values)
-                       // Apply zstd compression for plain encoding
-                       compressed := zstd.Compress(nil, bb.Buf, 
defaultCompressionLevel)
                        // Prepend EncodeTypePlain at the head of compressed 
data
-                       result := 
append([]byte{byte(encoding.EncodeTypePlain)}, compressed...)
-                       return result, nil
+                       bb.Buf = append([]byte{byte(encoding.EncodeTypePlain)}, 
bb.Buf...)
+                       return nil
                }
                if len(v) != 8 {
                        logger.Panicf("invalid value length at index %d: 
expected 8 bytes, got %d", i, len(v))
@@ -204,13 +184,10 @@ func encodeFloat64TagValues(bb *bytes.Buffer, values 
[][]byte) ([]byte, error) {
        intValues, exp, err := 
encoding.Float64ListToDecimalIntList(intValues[:0], floatValues)
        if err != nil {
                logger.Errorf("cannot convert Float64List to DecimalIntList: 
%v", err)
-               // Handle error by falling back to default encoding
                bb.Buf = encoding.EncodeBytesBlock(bb.Buf[:0], values)
-               // Apply zstd compression for plain encoding
-               compressed := zstd.Compress(nil, bb.Buf, 
defaultCompressionLevel)
                // Prepend EncodeTypePlain at the head of compressed data
-               result := append([]byte{byte(encoding.EncodeTypePlain)}, 
compressed...)
-               return result, nil
+               bb.Buf = append([]byte{byte(encoding.EncodeTypePlain)}, 
bb.Buf...)
+               return nil
        }
 
        var firstValue int64
@@ -222,14 +199,14 @@ func encodeFloat64TagValues(bb *bytes.Buffer, values 
[][]byte) ([]byte, error) {
        expBytes := convert.Int16ToBytes(exp)
 
        // Prepend encodeType (1 byte), exp (2 bytes) and firstValue (8 bytes) 
to the beginning
-       result := append(
+       bb.Buf = append(
                append(append([]byte{byte(encodeType)}, expBytes...), 
firstValueBytes...),
                bb.Buf...,
        )
-       return result, nil
+       return nil
 }
 
-func encodeDefaultTagValues(bb *bytes.Buffer, values [][]byte) ([]byte, error) 
{
+func encodeDefaultTagValues(bb *bytes.Buffer, values [][]byte) error {
        dict := generateDictionary()
        defer releaseDictionary(dict)
 
@@ -238,19 +215,17 @@ func encodeDefaultTagValues(bb *bytes.Buffer, values 
[][]byte) ([]byte, error) {
                        // Dictionary encoding failed, use plain encoding with 
zstd compression
                        bb.Buf = encoding.EncodeBytesBlock(bb.Buf[:0], values)
                        bb.Buf = append([]byte{byte(encoding.EncodeTypePlain)}, 
bb.Buf...)
-                       // Apply zstd compression for plain encoding
-                       compressed := zstd.Compress(nil, bb.Buf, 
defaultCompressionLevel)
-                       return compressed, nil
+                       return nil
                }
        }
 
        // Dictionary encoding succeeded
        bb.Buf = dict.Encode(bb.Buf[:0])
        bb.Buf = append([]byte{byte(encoding.EncodeTypeDictionary)}, bb.Buf...)
-       return append([]byte(nil), bb.Buf...), nil
+       return nil
 }
 
-func decodeInt64TagValues(decoder *encoding.BytesBlockDecoder, bb 
*bytes.Buffer, count uint64) ([][]byte, error) {
+func decodeInt64TagValues(dst [][]byte, decoder *encoding.BytesBlockDecoder, 
bb *bytes.Buffer, count uint64) ([][]byte, error) {
        intValuesPtr := generateInt64Slice(int(count))
        intValues := *intValuesPtr
        defer releaseInt64Slice(intValuesPtr)
@@ -263,21 +238,13 @@ func decodeInt64TagValues(decoder 
*encoding.BytesBlockDecoder, bb *bytes.Buffer,
        firstByte := encoding.EncodeType(bb.Buf[0])
 
        if firstByte == encoding.EncodeTypePlain {
-               // This is compressed data with EncodeTypePlain at the head
-               // Skip the EncodeTypePlain byte and decompress the rest
-               compressedData := bb.Buf[1:]
-               decompressed, err := zstd.Decompress(nil, compressedData)
-               if err != nil {
-                       logger.Panicf("cannot decompress data: %v", err)
-               }
-
                // Decode the decompressed data
-               values := make([][]byte, 0, count)
-               values, decodeErr := decoder.Decode(values[:0], decompressed, 
count)
+               var decodeErr error
+               dst, decodeErr = decoder.Decode(dst[:0], bb.Buf[1:], count)
                if decodeErr != nil {
                        logger.Panicf("cannot decode values: %v", decodeErr)
                }
-               return values, nil
+               return dst, nil
        }
 
        // Otherwise, this is int list data with EncodeType at the beginning
@@ -296,14 +263,17 @@ func decodeInt64TagValues(decoder 
*encoding.BytesBlockDecoder, bb *bytes.Buffer,
        }
 
        // Convert int64 array to byte array
-       values := make([][]byte, count)
+       if len(dst) < len(intValues) {
+               dst = append(dst, make([][]byte, len(intValues)-len(dst))...)
+       }
+       dst = dst[:len(intValues)]
        for i, v := range intValues {
-               values[i] = convert.Int64ToBytes(v)
+               dst[i] = convert.Int64ToBytes(v)
        }
-       return values, nil
+       return dst, nil
 }
 
-func decodeFloat64TagValues(decoder *encoding.BytesBlockDecoder, bb 
*bytes.Buffer, count uint64) ([][]byte, error) {
+func decodeFloat64TagValues(dst [][]byte, decoder *encoding.BytesBlockDecoder, 
bb *bytes.Buffer, count uint64) ([][]byte, error) {
        intValuesPtr := generateInt64Slice(int(count))
        intValues := *intValuesPtr
        defer releaseInt64Slice(intValuesPtr)
@@ -320,21 +290,12 @@ func decodeFloat64TagValues(decoder 
*encoding.BytesBlockDecoder, bb *bytes.Buffe
        firstByte := encoding.EncodeType(bb.Buf[0])
 
        if firstByte == encoding.EncodeTypePlain {
-               // This is compressed data with EncodeTypePlain at the head
-               // Skip the EncodeTypePlain byte and decompress the rest
-               compressedData := bb.Buf[1:]
-               decompressed, err := zstd.Decompress(nil, compressedData)
-               if err != nil {
-                       logger.Panicf("cannot decompress data: %v", err)
-               }
-
-               // Decode the decompressed data
-               values := make([][]byte, 0, count)
-               values, decodeErr := decoder.Decode(values[:0], decompressed, 
count)
+               var decodeErr error
+               dst, decodeErr = decoder.Decode(dst[:0], bb.Buf[1:], count)
                if decodeErr != nil {
                        logger.Panicf("cannot decode values: %v", decodeErr)
                }
-               return values, nil
+               return dst, nil
        }
 
        // Otherwise, this is float64 int list data with EncodeType at the 
beginning
@@ -363,26 +324,19 @@ func decodeFloat64TagValues(decoder 
*encoding.BytesBlockDecoder, bb *bytes.Buffe
        }
 
        // Convert float64 array to byte array
-       values := make([][]byte, count)
+       if len(dst) < len(floatValues) {
+               dst = append(dst, make([][]byte, len(floatValues)-len(dst))...)
+       }
+       dst = dst[:len(floatValues)]
        for i, v := range floatValues {
-               values[i] = convert.Float64ToBytes(v)
+               dst[i] = convert.Float64ToBytes(v)
        }
-       return values, nil
+       return dst, nil
 }
 
-func decodeDefaultTagValues(decoder *encoding.BytesBlockDecoder, bb 
*bytes.Buffer, count uint64, values [][]byte) ([][]byte, error) {
+func decodeDefaultTagValues(dst [][]byte, decoder *encoding.BytesBlockDecoder, 
bb *bytes.Buffer, count uint64) ([][]byte, error) {
        if len(bb.Buf) < 1 {
-               return values, nil
-       }
-
-       // Check if this is zstd compressed data (no encode type prefix)
-       decompressed, decompErr := zstd.Decompress(nil, bb.Buf)
-       if decompErr == nil {
-               // Successfully decompressed, this is compressed data
-               bb.Buf = decompressed
-               if len(bb.Buf) < 1 {
-                       return values, nil
-               }
+               return dst, nil
        }
 
        encodeType := encoding.EncodeType(bb.Buf[0])
@@ -392,15 +346,15 @@ func decodeDefaultTagValues(decoder 
*encoding.BytesBlockDecoder, bb *bytes.Buffe
        case encoding.EncodeTypeDictionary:
                dict := generateDictionary()
                defer releaseDictionary(dict)
-               values, err = dict.Decode(values[:0], bb.Buf[1:], count)
+               dst, err = dict.Decode(dst[:0], bb.Buf[1:], count)
        case encoding.EncodeTypePlain:
-               values, err = decoder.Decode(values[:0], bb.Buf[1:], count)
+               dst, err = decoder.Decode(dst[:0], bb.Buf[1:], count)
        default:
-               values, err = decoder.Decode(values[:0], bb.Buf[1:], count)
+               dst, err = decoder.Decode(dst[:0], bb.Buf[1:], count)
        }
 
        if err != nil {
                logger.Panicf("cannot decode values: %v", err)
        }
-       return values, nil
+       return dst, nil
 }
diff --git a/banyand/internal/encoding/tag_encoder_test.go 
b/banyand/internal/encoding/tag_encoder_test.go
index a0b1a54d..52e736ed 100644
--- a/banyand/internal/encoding/tag_encoder_test.go
+++ b/banyand/internal/encoding/tag_encoder_test.go
@@ -23,7 +23,9 @@ import (
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
 
+       "github.com/apache/skywalking-banyandb/pkg/bytes"
        "github.com/apache/skywalking-banyandb/pkg/convert"
+       pkgencoding "github.com/apache/skywalking-banyandb/pkg/encoding"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
 )
 
@@ -56,11 +58,13 @@ func TestEncodeDecodeTagValues_Int64_WithNilValues(t 
*testing.T) {
 
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
-                       encoded, err := EncodeTagValues(tt.values, 
pbv1.ValueTypeInt64)
+                       bb := &bytes.Buffer{}
+                       err := EncodeTagValues(bb, tt.values, 
pbv1.ValueTypeInt64)
                        require.NoError(t, err)
-                       require.NotNil(t, encoded)
+                       require.NotNil(t, bb.Buf)
 
-                       decoded, err := DecodeTagValues(encoded, 
pbv1.ValueTypeInt64, len(tt.values))
+                       decoder := &pkgencoding.BytesBlockDecoder{}
+                       decoded, err := DecodeTagValues(nil, decoder, bb, 
pbv1.ValueTypeInt64, len(tt.values))
                        require.NoError(t, err)
                        require.Len(t, decoded, len(tt.values))
 
@@ -104,11 +108,13 @@ func 
TestEncodeDecodeTagValues_Int64_WithNullStringValues(t *testing.T) {
 
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
-                       encoded, err := EncodeTagValues(tt.values, 
pbv1.ValueTypeInt64)
+                       bb := &bytes.Buffer{}
+                       err := EncodeTagValues(bb, tt.values, 
pbv1.ValueTypeInt64)
                        require.NoError(t, err)
-                       require.NotNil(t, encoded)
+                       require.NotNil(t, bb.Buf)
 
-                       decoded, err := DecodeTagValues(encoded, 
pbv1.ValueTypeInt64, len(tt.values))
+                       decoder := &pkgencoding.BytesBlockDecoder{}
+                       decoded, err := DecodeTagValues(nil, decoder, bb, 
pbv1.ValueTypeInt64, len(tt.values))
                        require.NoError(t, err)
                        require.Len(t, decoded, len(tt.values))
 
@@ -133,11 +139,13 @@ func 
TestEncodeDecodeTagValues_Int64_MixedNilAndNullString(t *testing.T) {
                []byte("null"),
        }
 
-       encoded, err := EncodeTagValues(values, pbv1.ValueTypeInt64)
+       bb := &bytes.Buffer{}
+       err := EncodeTagValues(bb, values, pbv1.ValueTypeInt64)
        require.NoError(t, err)
-       require.NotNil(t, encoded)
+       require.NotNil(t, bb.Buf)
 
-       decoded, err := DecodeTagValues(encoded, pbv1.ValueTypeInt64, 
len(values))
+       decoder := &pkgencoding.BytesBlockDecoder{}
+       decoded, err := DecodeTagValues(nil, decoder, bb, pbv1.ValueTypeInt64, 
len(values))
        require.NoError(t, err)
        require.Len(t, decoded, len(values))
 
@@ -164,11 +172,13 @@ func TestEncodeDecodeTagValues_Int64_ValidValues(t 
*testing.T) {
                convert.Int64ToBytes(-9223372036854775808), // min int64
        }
 
-       encoded, err := EncodeTagValues(values, pbv1.ValueTypeInt64)
+       bb := &bytes.Buffer{}
+       err := EncodeTagValues(bb, values, pbv1.ValueTypeInt64)
        require.NoError(t, err)
-       require.NotNil(t, encoded)
+       require.NotNil(t, bb.Buf)
 
-       decoded, err := DecodeTagValues(encoded, pbv1.ValueTypeInt64, 
len(values))
+       decoder := &pkgencoding.BytesBlockDecoder{}
+       decoded, err := DecodeTagValues(nil, decoder, bb, pbv1.ValueTypeInt64, 
len(values))
        require.NoError(t, err)
        require.Len(t, decoded, len(values))
 
@@ -178,11 +188,13 @@ func TestEncodeDecodeTagValues_Int64_ValidValues(t 
*testing.T) {
 }
 
 func TestEncodeDecodeTagValues_Int64_EmptyInput(t *testing.T) {
-       encoded, err := EncodeTagValues(nil, pbv1.ValueTypeInt64)
+       bb := &bytes.Buffer{}
+       err := EncodeTagValues(bb, nil, pbv1.ValueTypeInt64)
        require.NoError(t, err)
-       assert.Nil(t, encoded)
+       assert.Nil(t, bb.Buf)
 
-       decoded, err := DecodeTagValues(nil, pbv1.ValueTypeInt64, 0)
+       decoder := &pkgencoding.BytesBlockDecoder{}
+       decoded, err := DecodeTagValues(nil, decoder, bb, pbv1.ValueTypeInt64, 
0)
        require.NoError(t, err)
        assert.Nil(t, decoded)
 }
diff --git a/banyand/internal/sidx/block.go b/banyand/internal/sidx/block.go
index d740ef0b..77bc1c4c 100644
--- a/banyand/internal/sidx/block.go
+++ b/banyand/internal/sidx/block.go
@@ -23,6 +23,7 @@ import (
        "fmt"
 
        "github.com/apache/skywalking-banyandb/api/common"
+       internalencoding 
"github.com/apache/skywalking-banyandb/banyand/internal/encoding"
        "github.com/apache/skywalking-banyandb/pkg/bytes"
        "github.com/apache/skywalking-banyandb/pkg/compress/zstd"
        "github.com/apache/skywalking-banyandb/pkg/encoding"
@@ -278,15 +279,15 @@ func (b *block) mustWriteTag(tagName string, td *tagData, 
bm *blockMetadata, ww
        }()
 
        // Encode tag values using the encoding module
-       encodedData, err := encodeTagValues(td.values, td.valueType)
+       err := internalencoding.EncodeTagValues(bb, td.values, td.valueType)
        if err != nil {
                panic(fmt.Sprintf("failed to encode tag values: %v", err))
        }
 
        // Write tag data without compression
        tm.dataBlock.offset = tdw.bytesWritten
-       tm.dataBlock.size = uint64(len(encodedData))
-       tdw.MustWrite(encodedData)
+       tm.dataBlock.size = uint64(len(bb.Buf))
+       tdw.MustWrite(bb.Buf)
 
        // Write bloom filter if indexed
        if td.indexed && td.filter != nil {
diff --git a/banyand/internal/sidx/part.go b/banyand/internal/sidx/part.go
index 6840b0e4..454fadcd 100644
--- a/banyand/internal/sidx/part.go
+++ b/banyand/internal/sidx/part.go
@@ -25,6 +25,7 @@ import (
        "strings"
 
        "github.com/apache/skywalking-banyandb/api/common"
+       internalencoding 
"github.com/apache/skywalking-banyandb/banyand/internal/encoding"
        "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/pkg/bytes"
        "github.com/apache/skywalking-banyandb/pkg/compress/zstd"
@@ -354,7 +355,7 @@ func (p *part) readAll() ([]*elements, error) {
 
                        // Read tags for each tag name
                        for tagName := range bm.tagsBlocks {
-                               err = p.readBlockTags(tagName, bm, elems)
+                               err = p.readBlockTags(tagName, bm, elems, 
blockBytesDecoder)
                                if err != nil {
                                        releaseElements(elems)
                                        for _, e := range result {
@@ -372,7 +373,7 @@ func (p *part) readAll() ([]*elements, error) {
 }
 
 // readBlockTags reads and decodes tag data for a specific tag in a block.
-func (p *part) readBlockTags(tagName string, bm *blockMetadata, elems 
*elements) error {
+func (p *part) readBlockTags(tagName string, bm *blockMetadata, elems 
*elements, decoder *encoding.BytesBlockDecoder) error {
        tagBlockInfo, exists := bm.tagsBlocks[tagName]
        if !exists {
                return fmt.Errorf("tag block info not found for tag: %s", 
tagName)
@@ -405,7 +406,7 @@ func (p *part) readBlockTags(tagName string, bm 
*blockMetadata, elems *elements)
        fs.MustReadData(tdReader, int64(tm.dataBlock.offset), tdData)
 
        // Decode tag values directly (no compression)
-       tagValues, err := decodeTagValues(tdData, tm.valueType, int(bm.count))
+       tagValues, err := internalencoding.DecodeTagValues(nil, decoder, 
&bytes.Buffer{Buf: tdData}, tm.valueType, int(bm.count))
        if err != nil {
                return fmt.Errorf("cannot decode tag values: %w", err)
        }
diff --git a/banyand/internal/sidx/query_result.go 
b/banyand/internal/sidx/query_result.go
index 87154640..fa3ddc15 100644
--- a/banyand/internal/sidx/query_result.go
+++ b/banyand/internal/sidx/query_result.go
@@ -25,6 +25,7 @@ import (
        "go.uber.org/multierr"
 
        "github.com/apache/skywalking-banyandb/api/common"
+       internalencoding 
"github.com/apache/skywalking-banyandb/banyand/internal/encoding"
        "github.com/apache/skywalking-banyandb/banyand/protector"
        "github.com/apache/skywalking-banyandb/pkg/bytes"
        "github.com/apache/skywalking-banyandb/pkg/cgroups"
@@ -247,7 +248,7 @@ func (qr *queryResult) loadBlockData(tmpBlock *block, p 
*part, bm *blockMetadata
                        continue // Skip missing tags
                }
 
-               if !qr.loadTagData(tmpBlock, p, tagName, &tagBlockInfo, 
int(bm.count)) {
+               if !qr.loadTagData(tmpBlock, p, tagName, &tagBlockInfo, 
int(bm.count), decoder) {
                        // Continue loading other tags even if one fails
                        continue
                }
@@ -257,7 +258,7 @@ func (qr *queryResult) loadBlockData(tmpBlock *block, p 
*part, bm *blockMetadata
 }
 
 // loadTagData loads data for a specific tag, following the pattern from 
readBlockTags.
-func (qr *queryResult) loadTagData(tmpBlock *block, p *part, tagName string, 
tagBlockInfo *dataBlock, count int) bool {
+func (qr *queryResult) loadTagData(tmpBlock *block, p *part, tagName string, 
tagBlockInfo *dataBlock, count int, decoder *encoding.BytesBlockDecoder) bool {
        // Get tag metadata reader
        tmReader, tmExists := p.getTagMetadataReader(tagName)
        if !tmExists {
@@ -302,18 +303,17 @@ func (qr *queryResult) loadTagData(tmpBlock *block, p 
*part, tagName string, tag
        bb2.Buf = bytes.ResizeOver(bb2.Buf[:0], int(tm.dataBlock.size))
        fs.MustReadData(tdReader, int64(tm.dataBlock.offset), bb2.Buf)
 
+       // Create tag data structure and populate block
+       td := generateTagData()
        // Decode tag values directly (no compression)
-       tagValues, err := decodeTagValues(bb2.Buf, tm.valueType, count)
+       td.values, err = internalencoding.DecodeTagValues(td.values[:0], 
decoder, bb2, tm.valueType, count)
        if err != nil {
                return false
        }
 
-       // Create tag data structure and populate block
-       td := generateTagData()
        td.name = tagName
        td.valueType = tm.valueType
        td.indexed = tm.indexed
-       td.values = tagValues
 
        // Set min/max for int64 tags
        if tm.valueType == pbv1.ValueTypeInt64 {
@@ -324,7 +324,7 @@ func (qr *queryResult) loadTagData(tmpBlock *block, p 
*part, tagName string, tag
        // Create bloom filter for indexed tags if needed
        if tm.indexed {
                td.filter = generateBloomFilter(count)
-               for _, value := range tagValues {
+               for _, value := range td.values {
                        if value != nil {
                                td.filter.Add(value)
                        }
diff --git a/banyand/internal/sidx/tag.go b/banyand/internal/sidx/tag.go
index abb6bc33..9a38f381 100644
--- a/banyand/internal/sidx/tag.go
+++ b/banyand/internal/sidx/tag.go
@@ -21,7 +21,6 @@ import (
        "bytes"
        "fmt"
 
-       "github.com/apache/skywalking-banyandb/banyand/internal/encoding"
        pkgencoding "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/filter"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
@@ -186,14 +185,6 @@ func decodeBloomFilter(src []byte) (*filter.BloomFilter, 
error) {
        return bf, nil
 }
 
-func encodeTagValues(values [][]byte, valueType pbv1.ValueType) ([]byte, 
error) {
-       return encoding.EncodeTagValues(values, valueType)
-}
-
-func decodeTagValues(data []byte, valueType pbv1.ValueType, count int) 
([][]byte, error) {
-       return encoding.DecodeTagValues(data, valueType, count)
-}
-
 // updateMinMax updates min/max values for int64 tags.
 func (td *tagData) updateMinMax() {
        if td.valueType != pbv1.ValueTypeInt64 || len(td.values) == 0 {
diff --git a/banyand/liaison/grpc/discovery.go 
b/banyand/liaison/grpc/discovery.go
index 859bd7b5..8df965ad 100644
--- a/banyand/liaison/grpc/discovery.go
+++ b/banyand/liaison/grpc/discovery.go
@@ -47,7 +47,12 @@ type discoveryService struct {
 }
 
 func newDiscoveryService(kind schema.Kind, metadataRepo metadata.Repo, 
nodeRegistry NodeRegistry, gr *groupRepo) *discoveryService {
-       er := &entityRepo{entitiesMap: make(map[identity]partition.Locator)}
+       er := &entityRepo{
+               entitiesMap:     make(map[identity]partition.Locator),
+               measureMap:      make(map[identity]*databasev1.Measure),
+               traceMap:        make(map[identity]*databasev1.Trace),
+               traceIDIndexMap: make(map[identity]int),
+       }
        return newDiscoveryServiceWithEntityRepo(kind, metadataRepo, 
nodeRegistry, gr, er)
 }
 
@@ -184,9 +189,11 @@ var _ schema.EventHandler = (*entityRepo)(nil)
 
 type entityRepo struct {
        schema.UnimplementedOnInitHandler
-       log         *logger.Logger
-       entitiesMap map[identity]partition.Locator
-       measureMap  map[identity]*databasev1.Measure
+       log             *logger.Logger
+       entitiesMap     map[identity]partition.Locator
+       measureMap      map[identity]*databasev1.Measure
+       traceMap        map[identity]*databasev1.Trace
+       traceIDIndexMap map[identity]int // Cache trace ID tag index
        sync.RWMutex
 }
 
@@ -195,20 +202,6 @@ func (e *entityRepo) OnAddOrUpdate(schemaMetadata 
schema.Metadata) {
        var l partition.Locator
        var id identity
        var modRevision int64
-       switch schemaMetadata.Kind {
-       case schema.KindMeasure:
-               measure := schemaMetadata.Spec.(*databasev1.Measure)
-               modRevision = measure.GetMetadata().GetModRevision()
-               l = partition.NewEntityLocator(measure.TagFamilies, 
measure.Entity, modRevision)
-               id = getID(measure.GetMetadata())
-       case schema.KindStream:
-               stream := schemaMetadata.Spec.(*databasev1.Stream)
-               modRevision = stream.GetMetadata().GetModRevision()
-               l = partition.NewEntityLocator(stream.TagFamilies, 
stream.Entity, modRevision)
-               id = getID(stream.GetMetadata())
-       default:
-               return
-       }
        if le := e.log.Debug(); le.Enabled() {
                var kind string
                switch schemaMetadata.Kind {
@@ -216,6 +209,8 @@ func (e *entityRepo) OnAddOrUpdate(schemaMetadata 
schema.Metadata) {
                        kind = "measure"
                case schema.KindStream:
                        kind = "stream"
+               case schema.KindTrace:
+                       kind = "trace"
                default:
                        kind = "unknown"
                }
@@ -225,6 +220,36 @@ func (e *entityRepo) OnAddOrUpdate(schemaMetadata 
schema.Metadata) {
                        Str("kind", kind).
                        Msg("entity added or updated")
        }
+       switch schemaMetadata.Kind {
+       case schema.KindMeasure:
+               measure := schemaMetadata.Spec.(*databasev1.Measure)
+               modRevision = measure.GetMetadata().GetModRevision()
+               l = partition.NewEntityLocator(measure.TagFamilies, 
measure.Entity, modRevision)
+               id = getID(measure.GetMetadata())
+       case schema.KindStream:
+               stream := schemaMetadata.Spec.(*databasev1.Stream)
+               modRevision = stream.GetMetadata().GetModRevision()
+               l = partition.NewEntityLocator(stream.TagFamilies, 
stream.Entity, modRevision)
+               id = getID(stream.GetMetadata())
+       case schema.KindTrace:
+               trace := schemaMetadata.Spec.(*databasev1.Trace)
+               id = getID(trace.GetMetadata())
+               e.traceMap[id] = trace
+               // Pre-compute trace ID tag index
+               traceIDTagName := trace.GetTraceIdTagName()
+               traceIDIndex := -1
+               for i, tagSpec := range trace.GetTags() {
+                       if tagSpec.GetName() == traceIDTagName {
+                               traceIDIndex = i
+                               break
+                       }
+               }
+               e.traceIDIndexMap[id] = traceIDIndex
+               return
+       default:
+               return
+       }
+
        e.RWMutex.Lock()
        defer e.RWMutex.Unlock()
        e.entitiesMap[id] = partition.Locator{TagLocators: l.TagLocators, 
ModRevision: modRevision}
@@ -246,6 +271,9 @@ func (e *entityRepo) OnDelete(schemaMetadata 
schema.Metadata) {
        case schema.KindStream:
                stream := schemaMetadata.Spec.(*databasev1.Stream)
                id = getID(stream.GetMetadata())
+       case schema.KindTrace:
+               trace := schemaMetadata.Spec.(*databasev1.Trace)
+               id = getID(trace.GetMetadata())
        default:
                return
        }
@@ -256,6 +284,8 @@ func (e *entityRepo) OnDelete(schemaMetadata 
schema.Metadata) {
                        kind = "measure"
                case schema.KindStream:
                        kind = "stream"
+               case schema.KindTrace:
+                       kind = "trace"
                default:
                        kind = "unknown"
                }
@@ -268,7 +298,9 @@ func (e *entityRepo) OnDelete(schemaMetadata 
schema.Metadata) {
        e.RWMutex.Lock()
        defer e.RWMutex.Unlock()
        delete(e.entitiesMap, id)
-       delete(e.measureMap, id) // Ensure measure is not stored for streams
+       delete(e.measureMap, id)      // Clean up measure
+       delete(e.traceMap, id)        // Clean up trace
+       delete(e.traceIDIndexMap, id) // Clean up trace ID index
 }
 
 func (e *entityRepo) getLocator(id identity) (partition.Locator, bool) {
@@ -281,6 +313,26 @@ func (e *entityRepo) getLocator(id identity) 
(partition.Locator, bool) {
        return el, true
 }
 
+func (e *entityRepo) getTrace(id identity) (*databasev1.Trace, bool) {
+       e.RWMutex.RLock()
+       defer e.RWMutex.RUnlock()
+       trace, ok := e.traceMap[id]
+       if !ok {
+               return nil, false
+       }
+       return trace, true
+}
+
+func (e *entityRepo) getTraceIDIndex(id identity) (int, bool) {
+       e.RWMutex.RLock()
+       defer e.RWMutex.RUnlock()
+       index, ok := e.traceIDIndexMap[id]
+       if !ok {
+               return -1, false
+       }
+       return index, true
+}
+
 var _ schema.EventHandler = (*shardingKeyRepo)(nil)
 
 type shardingKeyRepo struct {
diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index ecc84fd2..f8d0c704 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -40,6 +40,7 @@ import (
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+       tracev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
        "github.com/apache/skywalking-banyandb/banyand/liaison/pkg/auth"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
@@ -97,6 +98,7 @@ type server struct {
        *topNAggregationRegistryServer
        *groupRegistryServer
        *traceRegistryServer
+       traceSVC                 *traceService
        authReloader             *auth.Reloader
        metrics                  *metrics
        keyFile                  string
@@ -130,11 +132,17 @@ func NewServer(_ context.Context, tir1Client, tir2Client, 
broadcaster queue.Clie
                pipeline:         tir1Client,
                broadcaster:      broadcaster,
        }
+       traceSVC := &traceService{
+               discoveryService: newDiscoveryService(schema.KindTrace, 
schemaRegistry, nr.StreamLiaisonNodeRegistry, gr),
+               pipeline:         tir1Client,
+               broadcaster:      broadcaster,
+       }
 
        s := &server{
                omr:        omr,
                streamSVC:  streamSVC,
                measureSVC: measureSVC,
+               traceSVC:   traceSVC,
                groupRepo:  gr,
                streamRegistryServer: &streamRegistryServer{
                        schemaRegistry: schemaRegistry,
@@ -169,7 +177,7 @@ func NewServer(_ context.Context, tir1Client, tir2Client, 
broadcaster queue.Clie
                schemaRepo:   schemaRegistry,
                authReloader: auth.InitAuthReloader(),
        }
-       s.accessLogRecorders = []accessLogRecorder{streamSVC, measureSVC}
+       s.accessLogRecorders = []accessLogRecorder{streamSVC, measureSVC, 
traceSVC}
 
        return s
 }
@@ -178,10 +186,12 @@ func (s *server) PreRun(_ context.Context) error {
        s.log = logger.GetLogger("liaison-grpc")
        s.streamSVC.setLogger(s.log.Named("stream-t1"))
        s.measureSVC.setLogger(s.log)
+       s.traceSVC.setLogger(s.log.Named("trace"))
        s.propertyServer.SetLogger(s.log)
        components := []*discoveryService{
                s.streamSVC.discoveryService,
                s.measureSVC.discoveryService,
+               s.traceSVC.discoveryService,
                s.propertyServer.discoveryService,
        }
        s.schemaRepo.RegisterHandler("liaison", schema.KindGroup, s.groupRepo)
@@ -208,6 +218,7 @@ func (s *server) PreRun(_ context.Context) error {
        s.metrics = metrics
        s.streamSVC.metrics = metrics
        s.measureSVC.metrics = metrics
+       s.traceSVC.metrics = metrics
        s.propertyServer.metrics = metrics
        s.streamRegistryServer.metrics = metrics
        s.indexRuleBindingRegistryServer.metrics = metrics
@@ -339,6 +350,7 @@ func (s *server) Serve() run.StopNotify {
        commonv1.RegisterServiceServer(s.ser, &apiVersionService{})
        streamv1.RegisterStreamServiceServer(s.ser, s.streamSVC)
        measurev1.RegisterMeasureServiceServer(s.ser, s.measureSVC)
+       tracev1.RegisterTraceServiceServer(s.ser, s.traceSVC)
        databasev1.RegisterGroupRegistryServiceServer(s.ser, 
s.groupRegistryServer)
        databasev1.RegisterIndexRuleBindingRegistryServiceServer(s.ser, 
s.indexRuleBindingRegistryServer)
        databasev1.RegisterIndexRuleRegistryServiceServer(s.ser, 
s.indexRuleRegistryServer)
diff --git a/banyand/liaison/grpc/trace.go b/banyand/liaison/grpc/trace.go
new file mode 100644
index 00000000..599d054c
--- /dev/null
+++ b/banyand/liaison/grpc/trace.go
@@ -0,0 +1,370 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+       "context"
+       "hash/fnv"
+       "io"
+       "time"
+
+       "github.com/pkg/errors"
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/status"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/api/data"
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       tracev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
+       "github.com/apache/skywalking-banyandb/pkg/accesslog"
+       "github.com/apache/skywalking-banyandb/pkg/bus"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/query"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+type traceService struct {
+       tracev1.UnimplementedTraceServiceServer
+       ingestionAccessLog accesslog.Log
+       pipeline           queue.Client
+       broadcaster        queue.Client
+       *discoveryService
+       l               *logger.Logger
+       metrics         *metrics
+       writeTimeout    time.Duration
+       maxWaitDuration time.Duration
+}
+
+func (s *traceService) setLogger(log *logger.Logger) {
+       s.l = log
+}
+
+func (s *traceService) activeIngestionAccessLog(root string, sampled bool) 
(err error) {
+       if s.ingestionAccessLog, err = accesslog.
+               NewFileLog(root, "trace-ingest-%s", 10*time.Minute, s.log, 
sampled); err != nil {
+               return err
+       }
+       return nil
+}
+
+func (s *traceService) validateTimestamp(writeEntity *tracev1.WriteRequest) 
error {
+       // Get trace schema from entityRepo
+       id := getID(writeEntity.GetMetadata())
+       traceEntity, existed := s.entityRepo.getTrace(id)
+       if !existed {
+               return errors.New("trace schema not found")
+       }
+
+       timestampTagName := traceEntity.GetTimestampTagName()
+       for _, tag := range writeEntity.GetTags() {
+               if tag.GetTimestamp() != nil {
+                       if err := timestamp.CheckPb(tag.GetTimestamp()); err != 
nil {
+                               s.l.Error().Stringer("written", 
writeEntity).Err(err).Msg("the timestamp is invalid")
+                               return err
+                       }
+                       return nil
+               }
+       }
+
+       return errors.New("timestamp tag not found: " + timestampTagName)
+}
+
+func (s *traceService) validateMetadata(writeEntity *tracev1.WriteRequest) 
error {
+       if writeEntity.Metadata.ModRevision > 0 {
+               traceCache, existed := 
s.entityRepo.getTrace(getID(writeEntity.GetMetadata()))
+               if !existed {
+                       return errors.New("trace schema not found")
+               }
+               if writeEntity.Metadata.ModRevision != 
traceCache.GetMetadata().GetModRevision() {
+                       return errors.New("expired trace schema")
+               }
+       }
+       return nil
+}
+
+func (s *traceService) extractTraceID(tags []*modelv1.TagValue, traceIDIndex 
int) (string, error) {
+       if len(tags) == 0 {
+               return "", errors.New("no tags found")
+       }
+
+       if traceIDIndex < 0 || traceIDIndex >= len(tags) {
+               return "", errors.New("trace ID tag index out of range")
+       }
+
+       tag := tags[traceIDIndex]
+       switch v := tag.GetValue().(type) {
+       case *modelv1.TagValue_Str:
+               return v.Str.GetValue(), nil
+       case *modelv1.TagValue_BinaryData:
+               return string(v.BinaryData), nil
+       default:
+               return "", errors.New("trace ID must be string or binary data")
+       }
+}
+
+func (s *traceService) getTraceShardID(writeEntity *tracev1.WriteRequest) 
(common.ShardID, error) {
+       // Get shard count from group configuration
+       shardCount, existed := 
s.groupRepo.shardNum(writeEntity.GetMetadata().GetGroup())
+       if !existed {
+               return 0, errors.New("group not found or no shard 
configuration")
+       }
+
+       // Get cached trace ID index from entityRepo
+       id := getID(writeEntity.GetMetadata())
+       traceIDIndex, existed := s.entityRepo.getTraceIDIndex(id)
+       if !existed {
+               return 0, errors.New("trace schema not found")
+       }
+
+       if traceIDIndex == -1 {
+               return 0, errors.New("trace ID tag not found in schema")
+       }
+
+       traceID, err := s.extractTraceID(writeEntity.GetTags(), traceIDIndex)
+       if err != nil {
+               return 0, err
+       }
+
+       // Calculate shard ID using hash of trace ID
+       hasher := fnv.New32a()
+       hasher.Write([]byte(traceID))
+       hash := hasher.Sum32()
+
+       return common.ShardID(hash % shardCount), nil
+}
+
+func (s *traceService) getTraceShardIDWithRetry(writeEntity 
*tracev1.WriteRequest) (common.ShardID, error) {
+       if s.maxWaitDuration > 0 {
+               retryInterval := 10 * time.Millisecond
+               startTime := time.Now()
+               for {
+                       shardID, err := s.getTraceShardID(writeEntity)
+                       if err == nil || !errors.Is(err, errNotExist) || 
time.Since(startTime) > s.maxWaitDuration {
+                               return shardID, err
+                       }
+                       time.Sleep(retryInterval)
+                       retryInterval = time.Duration(float64(retryInterval) * 
1.5)
+                       if retryInterval > time.Second {
+                               retryInterval = time.Second
+                       }
+               }
+       }
+       return s.getTraceShardID(writeEntity)
+}
+
+func (s *traceService) publishMessages(
+       ctx context.Context,
+       publisher queue.BatchPublisher,
+       writeEntity *tracev1.WriteRequest,
+       shardID common.ShardID,
+) ([]string, error) {
+       iwr := &tracev1.InternalWriteRequest{
+               ShardId: uint32(shardID),
+               Request: writeEntity,
+       }
+       nodeID, err := 
s.nodeRegistry.Locate(writeEntity.GetMetadata().GetGroup(), 
writeEntity.GetMetadata().GetName(), uint32(shardID), 0)
+       if err != nil {
+               return nil, err
+       }
+
+       message := 
bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, iwr)
+       if _, err := publisher.Publish(ctx, data.TopicTraceWrite, message); err 
!= nil {
+               return nil, err
+       }
+       return []string{nodeID}, nil
+}
+
+func (s *traceService) Write(stream tracev1.TraceService_WriteServer) error {
+       reply := func(metadata *commonv1.Metadata, status modelv1.Status, 
version uint64, stream tracev1.TraceService_WriteServer, logger *logger.Logger) 
{
+               if status != modelv1.Status_STATUS_SUCCEED {
+                       s.metrics.totalStreamMsgReceivedErr.Inc(1, 
metadata.Group, "trace", "write")
+               }
+               s.metrics.totalStreamMsgSent.Inc(1, metadata.Group, "trace", 
"write")
+               if errResp := stream.Send(&tracev1.WriteResponse{Metadata: 
metadata, Status: status.String(), Version: version}); errResp != nil {
+                       if dl := logger.Debug(); dl.Enabled() {
+                               dl.Err(errResp).Msg("failed to send trace write 
response")
+                       }
+                       s.metrics.totalStreamMsgSentErr.Inc(1, metadata.Group, 
"trace", "write")
+               }
+       }
+
+       s.metrics.totalStreamStarted.Inc(1, "trace", "write")
+       publisher := s.pipeline.NewBatchPublisher(s.writeTimeout)
+       start := time.Now()
+       var succeedSent []succeedSentMessage
+       requestCount := 0
+       defer func() {
+               cee, err := publisher.Close()
+               for _, ssm := range succeedSent {
+                       code := modelv1.Status_STATUS_SUCCEED
+                       if cee != nil {
+                               for _, node := range ssm.nodes {
+                                       if ce, ok := cee[node]; ok {
+                                               code = ce.Status()
+                                               break
+                                       }
+                               }
+                       }
+                       reply(ssm.metadata, code, ssm.messageID, stream, s.l)
+               }
+               if err != nil {
+                       s.l.Error().Err(err).Msg("failed to close the 
publisher")
+               }
+               if dl := s.l.Debug(); dl.Enabled() {
+                       dl.Int("total_requests", requestCount).Msg("completed 
trace write batch")
+               }
+               s.metrics.totalStreamFinished.Inc(1, "trace", "write")
+               s.metrics.totalStreamLatency.Inc(time.Since(start).Seconds(), 
"trace", "write")
+       }()
+
+       ctx := stream.Context()
+       for {
+               select {
+               case <-ctx.Done():
+                       return ctx.Err()
+               default:
+               }
+
+               writeEntity, err := stream.Recv()
+               if errors.Is(err, io.EOF) {
+                       return nil
+               }
+               if err != nil {
+                       if !errors.Is(err, context.DeadlineExceeded) && 
!errors.Is(err, context.Canceled) {
+                               s.l.Error().Stringer("written", 
writeEntity).Err(err).Msg("failed to receive message")
+                       }
+                       return err
+               }
+
+               requestCount++
+               s.metrics.totalStreamMsgReceived.Inc(1, 
writeEntity.Metadata.Group, "trace", "write")
+
+               if err = s.validateTimestamp(writeEntity); err != nil {
+                       reply(writeEntity.GetMetadata(), 
modelv1.Status_STATUS_INVALID_TIMESTAMP, writeEntity.GetVersion(), stream, s.l)
+                       continue
+               }
+
+               if err = s.validateMetadata(writeEntity); err != nil {
+                       status := modelv1.Status_STATUS_INTERNAL_ERROR
+                       if errors.Is(err, errors.New("trace schema not found")) 
{
+                               status = modelv1.Status_STATUS_NOT_FOUND
+                       } else if errors.Is(err, errors.New("expired trace 
schema")) {
+                               status = modelv1.Status_STATUS_EXPIRED_SCHEMA
+                       }
+                       s.l.Error().Err(err).Stringer("written", 
writeEntity).Msg("metadata validation failed")
+                       reply(writeEntity.GetMetadata(), status, 
writeEntity.GetVersion(), stream, s.l)
+                       continue
+               }
+
+               shardID, err := s.getTraceShardIDWithRetry(writeEntity)
+               if err != nil {
+                       s.l.Error().Err(err).RawJSON("written", 
logger.Proto(writeEntity)).Msg("trace sharding failed")
+                       reply(writeEntity.GetMetadata(), 
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetVersion(), stream, s.l)
+                       continue
+               }
+
+               if s.ingestionAccessLog != nil {
+                       if errAL := s.ingestionAccessLog.Write(writeEntity); 
errAL != nil {
+                               s.l.Error().Err(errAL).Msg("failed to write 
ingestion access log")
+                       }
+               }
+
+               nodes, err := s.publishMessages(ctx, publisher, writeEntity, 
shardID)
+               if err != nil {
+                       s.l.Error().Err(err).RawJSON("written", 
logger.Proto(writeEntity)).Msg("publishing failed")
+                       reply(writeEntity.GetMetadata(), 
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetVersion(), stream, s.l)
+                       continue
+               }
+
+               succeedSent = append(succeedSent, succeedSentMessage{
+                       metadata:  writeEntity.GetMetadata(),
+                       messageID: writeEntity.GetVersion(),
+                       nodes:     nodes,
+               })
+       }
+}
+
+var emptyTraceQueryResponse = &tracev1.QueryResponse{Spans: 
make([]*tracev1.Span, 0)}
+
+func (s *traceService) Query(ctx context.Context, req *tracev1.QueryRequest) 
(resp *tracev1.QueryResponse, err error) {
+       for _, g := range req.Groups {
+               s.metrics.totalStarted.Inc(1, g, "trace", "query")
+       }
+       start := time.Now()
+       defer func() {
+               for _, g := range req.Groups {
+                       s.metrics.totalFinished.Inc(1, g, "trace", "query")
+                       if err != nil {
+                               s.metrics.totalErr.Inc(1, g, "trace", "query")
+                       }
+                       s.metrics.totalLatency.Inc(time.Since(start).Seconds(), 
g, "trace", "query")
+               }
+       }()
+       timeRange := req.GetTimeRange()
+       if timeRange == nil {
+               req.TimeRange = timestamp.DefaultTimeRange
+       }
+       if err = timestamp.CheckTimeRange(req.GetTimeRange()); err != nil {
+               return nil, status.Errorf(codes.InvalidArgument, "%v is invalid 
:%s", req.GetTimeRange(), err)
+       }
+       now := time.Now()
+       if req.Trace {
+               tracer, _ := query.NewTracer(ctx, now.Format(time.RFC3339Nano))
+               span, _ := tracer.StartSpan(ctx, "trace-grpc")
+               span.Tag("request", convert.BytesToString(logger.Proto(req)))
+               defer func() {
+                       if err != nil {
+                               span.Error(err)
+                       } else {
+                               span.AddSubTrace(resp.TraceQueryResult)
+                               resp.TraceQueryResult = tracer.ToProto()
+                       }
+                       span.Stop()
+               }()
+       }
+       message := bus.NewMessage(bus.MessageID(now.UnixNano()), req)
+       feat, errQuery := s.broadcaster.Publish(ctx, data.TopicTraceQuery, 
message)
+       if errQuery != nil {
+               if errors.Is(errQuery, io.EOF) {
+                       return emptyTraceQueryResponse, nil
+               }
+               return nil, errQuery
+       }
+       msg, errFeat := feat.Get()
+       if errFeat != nil {
+               return nil, errFeat
+       }
+       data := msg.Data()
+       switch d := data.(type) {
+       case *tracev1.QueryResponse:
+               return d, nil
+       case *common.Error:
+               return nil, errors.WithMessage(errQueryMsg, d.Error())
+       }
+       return nil, nil
+}
+
+func (s *traceService) Close() error {
+       if s.ingestionAccessLog != nil {
+               return s.ingestionAccessLog.Close()
+       }
+       return nil
+}
diff --git a/banyand/liaison/http/server.go b/banyand/liaison/http/server.go
index 20bac794..d5259514 100644
--- a/banyand/liaison/http/server.go
+++ b/banyand/liaison/http/server.go
@@ -43,6 +43,7 @@ import (
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+       tracev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
        "github.com/apache/skywalking-banyandb/banyand/liaison/pkg/auth"
        "github.com/apache/skywalking-banyandb/pkg/healthcheck"
        "github.com/apache/skywalking-banyandb/pkg/logger"
@@ -359,6 +360,7 @@ func (p *server) initGRPCClient() error {
                measurev1.RegisterMeasureServiceHandlerFromEndpoint(p.grpcCtx, 
p.gwMux, p.grpcAddr, opts),
                
propertyv1.RegisterPropertyServiceHandlerFromEndpoint(p.grpcCtx, p.gwMux, 
p.grpcAddr, opts),
                
databasev1.RegisterTraceRegistryServiceHandlerFromEndpoint(p.grpcCtx, p.gwMux, 
p.grpcAddr, opts),
+               tracev1.RegisterTraceServiceHandlerFromEndpoint(p.grpcCtx, 
p.gwMux, p.grpcAddr, opts),
        )
        if err != nil {
                return errors.Wrap(err, "failed to register endpoints")
diff --git a/banyand/stream/tag.go b/banyand/stream/tag.go
index f1751313..d78bcb89 100644
--- a/banyand/stream/tag.go
+++ b/banyand/stream/tag.go
@@ -18,7 +18,7 @@
 package stream
 
 import (
-       "github.com/apache/skywalking-banyandb/banyand/internal/encoding"
+       internalencoding 
"github.com/apache/skywalking-banyandb/banyand/internal/encoding"
        "github.com/apache/skywalking-banyandb/pkg/bytes"
        pkgencoding "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/fs"
@@ -61,18 +61,21 @@ func (t *tag) mustWriteTo(tm *tagMetadata, tagWriter 
*writer, tagFilterWriter *w
        tm.name = t.name
        tm.valueType = t.valueType
 
+       bb := bigValuePool.Generate()
+       defer bigValuePool.Release(bb)
+
        // Use shared encoding module
-       encodedData, err := encoding.EncodeTagValues(t.values, t.valueType)
+       err := internalencoding.EncodeTagValues(bb, t.values, t.valueType)
        if err != nil {
                logger.Panicf("failed to encode tag values: %v", err)
        }
 
-       tm.size = uint64(len(encodedData))
+       tm.size = uint64(len(bb.Buf))
        if tm.size > maxValuesBlockSize {
                logger.Panicf("too large valuesSize: %d bytes; mustn't exceed 
%d bytes", tm.size, maxValuesBlockSize)
        }
        tm.offset = tagWriter.bytesWritten
-       tagWriter.MustWrite(encodedData)
+       tagWriter.MustWrite(bb.Buf)
 
        if t.filter != nil {
                bb := bigValuePool.Generate()
@@ -88,7 +91,7 @@ func (t *tag) mustWriteTo(tm *tagMetadata, tagWriter *writer, 
tagFilterWriter *w
        }
 }
 
-func (t *tag) mustReadValues(_ *pkgencoding.BytesBlockDecoder, reader 
fs.Reader, cm tagMetadata, count uint64) {
+func (t *tag) mustReadValues(decoder *pkgencoding.BytesBlockDecoder, reader 
fs.Reader, cm tagMetadata, count uint64) {
        t.name = cm.name
        t.valueType = cm.valueType
        if t.valueType == pbv1.ValueTypeUnknown {
@@ -103,18 +106,20 @@ func (t *tag) mustReadValues(_ 
*pkgencoding.BytesBlockDecoder, reader fs.Reader,
                logger.Panicf("%s: block size cannot exceed %d bytes; got %d 
bytes", reader.Path(), maxValuesBlockSize, valuesSize)
        }
 
-       data := make([]byte, valuesSize)
-       fs.MustReadData(reader, int64(cm.offset), data)
-
        // Use shared decoding module
-       decodedValues, err := encoding.DecodeTagValues(data, t.valueType, 
int(count))
+       bb := bigValuePool.Generate()
+       defer bigValuePool.Release(bb)
+       bb.Buf = bytes.ResizeOver(bb.Buf[:0], int(valuesSize))
+       fs.MustReadData(reader, int64(cm.offset), bb.Buf)
+
+       var err error
+       t.values, err = internalencoding.DecodeTagValues(t.values, decoder, bb, 
t.valueType, int(count))
        if err != nil {
                logger.Panicf("%s: failed to decode tag values: %v", 
reader.Path(), err)
        }
-       t.values = decodedValues
 }
 
-func (t *tag) mustSeqReadValues(_ *pkgencoding.BytesBlockDecoder, reader 
*seqReader, cm tagMetadata, count uint64) {
+func (t *tag) mustSeqReadValues(decoder *pkgencoding.BytesBlockDecoder, reader 
*seqReader, cm tagMetadata, count uint64) {
        t.name = cm.name
        t.valueType = cm.valueType
        if cm.offset != reader.bytesRead {
@@ -125,15 +130,17 @@ func (t *tag) mustSeqReadValues(_ 
*pkgencoding.BytesBlockDecoder, reader *seqRea
                logger.Panicf("%s: block size cannot exceed %d bytes; got %d 
bytes", reader.Path(), maxValuesBlockSize, valuesSize)
        }
 
-       data := make([]byte, valuesSize)
-       reader.mustReadFull(data)
+       bb := bigValuePool.Generate()
+       defer bigValuePool.Release(bb)
+       bb.Buf = bytes.ResizeOver(bb.Buf[:0], int(valuesSize))
+       reader.mustReadFull(bb.Buf)
 
        // Use shared decoding module
-       decodedValues, err := encoding.DecodeTagValues(data, t.valueType, 
int(count))
+       var err error
+       t.values, err = internalencoding.DecodeTagValues(t.values, decoder, bb, 
t.valueType, int(count))
        if err != nil {
                logger.Panicf("%s: failed to decode tag values: %v", 
reader.Path(), err)
        }
-       t.values = decodedValues
 }
 
 var bigValuePool = bytes.NewBufferPool("stream-big-value")
diff --git a/banyand/trace/block.go b/banyand/trace/block.go
index 1fc28c26..48dd3c30 100644
--- a/banyand/trace/block.go
+++ b/banyand/trace/block.go
@@ -159,10 +159,12 @@ func (b *block) unmarshalTag(decoder 
*encoding.BytesBlockDecoder, i int,
        if err != nil {
                logger.Panicf("%s: cannot unmarshal tagMetadata: %v", 
metaReader.Path(), err)
        }
+       tm.name = name
        bigValuePool.Release(bb)
        b.tags[i].name = name
        if valueType, ok := tagType[name]; ok {
                b.tags[i].valueType = valueType
+               tm.valueType = valueType
        } else {
                b.tags[i].valueType = pbv1.ValueTypeUnknown
                for j := range b.tags[i].values {
@@ -199,6 +201,8 @@ func (b *block) unmarshalTagFromSeqReaders(decoder 
*encoding.BytesBlockDecoder,
        sort.Strings(keys)
        b.tags[i].name = keys[i]
        b.tags[i].valueType = tagType[keys[i]]
+       tm.name = keys[i]
+       tm.valueType = tagType[keys[i]]
        b.tags[i].mustSeqReadValues(decoder, valueReader, *tm, uint64(b.Len()))
 }
 
diff --git a/banyand/trace/introducer.go b/banyand/trace/introducer.go
index a67869de..809820c9 100644
--- a/banyand/trace/introducer.go
+++ b/banyand/trace/introducer.go
@@ -32,7 +32,7 @@ func (i *introduction) reset() {
        i.applied = nil
 }
 
-var introductionPool = pool.Register[*introduction]("stream-introduction")
+var introductionPool = pool.Register[*introduction]("trace-introduction")
 
 func generateIntroduction() *introduction {
        v := introductionPool.Get()
@@ -60,7 +60,7 @@ func (i *flusherIntroduction) reset() {
        i.applied = nil
 }
 
-var flusherIntroductionPool = 
pool.Register[*flusherIntroduction]("stream-flusher-introduction")
+var flusherIntroductionPool = 
pool.Register[*flusherIntroduction]("trace-flusher-introduction")
 
 func generateFlusherIntroduction() *flusherIntroduction {
        v := flusherIntroductionPool.Get()
@@ -94,7 +94,7 @@ func (i *mergerIntroduction) reset() {
        i.creator = 0
 }
 
-var mergerIntroductionPool = 
pool.Register[*mergerIntroduction]("stream-merger-introduction")
+var mergerIntroductionPool = 
pool.Register[*mergerIntroduction]("trace-merger-introduction")
 
 func generateMergerIntroduction() *mergerIntroduction {
        v := mergerIntroductionPool.Get()
@@ -122,7 +122,7 @@ func (i *syncIntroduction) reset() {
        i.applied = nil
 }
 
-var syncIntroductionPool = 
pool.Register[*syncIntroduction]("stream-sync-introduction")
+var syncIntroductionPool = 
pool.Register[*syncIntroduction]("trace-sync-introduction")
 
 func generateSyncIntroduction() *syncIntroduction {
        v := syncIntroductionPool.Get()
diff --git a/banyand/trace/metadata.go b/banyand/trace/metadata.go
index 0d3c9cf0..b44836bf 100644
--- a/banyand/trace/metadata.go
+++ b/banyand/trace/metadata.go
@@ -20,10 +20,12 @@ package trace
 import (
        "context"
        "fmt"
+       "path"
        "time"
 
        "github.com/pkg/errors"
 
+       "github.com/apache/skywalking-banyandb/api/common"
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        "github.com/apache/skywalking-banyandb/api/validate"
@@ -34,7 +36,9 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/banyand/protector"
+       "github.com/apache/skywalking-banyandb/banyand/queue/pub"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/meter"
        resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
@@ -91,7 +95,10 @@ func newLiaisonSchemaRepo(path string, svc *liaison, 
traceDataNodeRegistry grpc.
 }
 
 func (sr *schemaRepo) start() {
-       sr.l.Info().Str("path", sr.path).Msg("starting trace metadata 
repository")
+       sr.Watcher()
+       sr.metadata.
+               RegisterHandler("trace", 
schema.KindGroup|schema.KindTrace|schema.KindIndexRuleBinding|schema.KindIndexRule,
+                       sr)
 }
 
 func (sr *schemaRepo) Trace(metadata *commonv1.Metadata) (*trace, bool) {
@@ -103,8 +110,16 @@ func (sr *schemaRepo) Trace(metadata *commonv1.Metadata) 
(*trace, bool) {
        return t, ok
 }
 
-func (sr *schemaRepo) GetRemovalSegmentsTimeRange(_ string) 
*timestamp.TimeRange {
-       panic("not implemented")
+func (sr *schemaRepo) GetRemovalSegmentsTimeRange(group string) 
*timestamp.TimeRange {
+       g, ok := sr.LoadGroup(group)
+       if !ok {
+               return nil
+       }
+       db := g.SupplyTSDB()
+       if db == nil {
+               return nil
+       }
+       return db.(storage.TSDB[*tsTable, option]).GetExpiredSegmentsTimeRange()
 }
 
 func (sr *schemaRepo) OnInit(kinds []schema.Kind) (bool, []int64) {
@@ -260,6 +275,15 @@ type supplier struct {
 }
 
 func newSupplier(path string, svc *standalone, nodeLabels map[string]string) 
*supplier {
+       if svc.pm == nil {
+               svc.l.Panic().Msg("CRITICAL: svc.pm is nil in newSupplier")
+       }
+       opt := svc.option
+       opt.protector = svc.pm
+
+       if opt.protector == nil {
+               svc.l.Panic().Msg("CRITICAL: opt.protector is still nil after 
assignment")
+       }
        return &supplier{
                metadata:   svc.metadata,
                omr:        svc.omr,
@@ -267,7 +291,7 @@ func newSupplier(path string, svc *standalone, nodeLabels 
map[string]string) *su
                l:          svc.l,
                nodeLabels: nodeLabels,
                path:       path,
-               option:     svc.option,
+               option:     opt,
        }
 }
 
@@ -282,8 +306,64 @@ func (s *supplier) ResourceSchema(md *commonv1.Metadata) 
(resourceSchema.Resourc
        return s.metadata.TraceRegistry().GetTrace(ctx, md)
 }
 
-func (s *supplier) OpenDB(_ *commonv1.Group) (resourceSchema.DB, error) {
-       panic("not implemented")
+func (s *supplier) OpenDB(groupSchema *commonv1.Group) (resourceSchema.DB, 
error) {
+       name := groupSchema.Metadata.Group
+       p := common.Position{
+               Module:   "trace",
+               Database: name,
+       }
+       ro := groupSchema.ResourceOpts
+       if ro == nil {
+               return nil, fmt.Errorf("no resource opts in group %s", name)
+       }
+       shardNum := ro.ShardNum
+       ttl := ro.Ttl
+       segInterval := ro.SegmentInterval
+       segmentIdleTimeout := time.Duration(0)
+       if len(ro.Stages) > 0 && len(s.nodeLabels) > 0 {
+               var ttlNum uint32
+               for _, st := range ro.Stages {
+                       if st.Ttl.Unit != ro.Ttl.Unit {
+                               return nil, fmt.Errorf("ttl unit %s is not 
consistent with stage %s", ro.Ttl.Unit, st.Ttl.Unit)
+                       }
+                       selector, err := pub.ParseLabelSelector(st.NodeSelector)
+                       if err != nil {
+                               return nil, errors.WithMessagef(err, "failed to 
parse node selector %s", st.NodeSelector)
+                       }
+                       ttlNum += st.Ttl.Num
+                       if !selector.Matches(s.nodeLabels) {
+                               continue
+                       }
+                       ttl.Num += ttlNum
+                       shardNum = st.ShardNum
+                       segInterval = st.SegmentInterval
+                       if st.Close {
+                               segmentIdleTimeout = 5 * time.Minute
+                       }
+                       break
+               }
+       }
+       group := groupSchema.Metadata.Group
+       opts := storage.TSDBOpts[*tsTable, option]{
+               ShardNum:                       shardNum,
+               Location:                       path.Join(s.path, group),
+               TSTableCreator:                 newTSTable,
+               TableMetrics:                   s.newMetrics(p),
+               SegmentInterval:                
storage.MustToIntervalRule(segInterval),
+               TTL:                            storage.MustToIntervalRule(ttl),
+               Option:                         s.option,
+               SeriesIndexFlushTimeoutSeconds: 
s.option.flushTimeout.Nanoseconds() / int64(time.Second),
+               SeriesIndexCacheMaxBytes:       
int(s.option.seriesCacheMaxSize),
+               StorageMetricsFactory:          
s.omr.With(traceScope.ConstLabels(meter.ToLabelPairs(common.DBLabelNames(), 
p.DBLabelValues()))),
+               SegmentIdleTimeout:             segmentIdleTimeout,
+               MemoryLimit:                    s.pm.GetLimit(),
+       }
+       return storage.OpenTSDB(
+               common.SetPosition(context.Background(), func(_ 
common.Position) common.Position {
+                       return p
+               }),
+               opts, nil, group,
+       )
 }
 
 // queueSupplier is the supplier for liaison service.
@@ -299,6 +379,15 @@ type queueSupplier struct {
 }
 
 func newQueueSupplier(path string, svc *liaison, traceDataNodeRegistry 
grpc.NodeRegistry) *queueSupplier {
+       if svc.pm == nil {
+               svc.l.Panic().Msg("CRITICAL: svc.pm is nil in newSupplier")
+       }
+       opt := svc.option
+       opt.protector = svc.pm
+
+       if opt.protector == nil {
+               svc.l.Panic().Msg("CRITICAL: opt.protector is still nil after 
assignment")
+       }
        return &queueSupplier{
                metadata:              svc.metadata,
                omr:                   svc.omr,
@@ -306,7 +395,7 @@ func newQueueSupplier(path string, svc *liaison, 
traceDataNodeRegistry grpc.Node
                traceDataNodeRegistry: traceDataNodeRegistry,
                l:                     svc.l,
                path:                  path,
-               option:                svc.option,
+               option:                opt,
        }
 }
 
@@ -321,6 +410,48 @@ func (qs *queueSupplier) ResourceSchema(md 
*commonv1.Metadata) (resourceSchema.R
        return qs.metadata.TraceRegistry().GetTrace(ctx, md)
 }
 
-func (qs *queueSupplier) OpenDB(_ *commonv1.Group) (resourceSchema.DB, error) {
-       panic("not implemented")
+func (qs *queueSupplier) OpenDB(groupSchema *commonv1.Group) 
(resourceSchema.DB, error) {
+       name := groupSchema.Metadata.Group
+       p := common.Position{
+               Module:   "trace",
+               Database: name,
+       }
+       ro := groupSchema.ResourceOpts
+       if ro == nil {
+               return nil, fmt.Errorf("no resource opts in group %s", name)
+       }
+       shardNum := ro.ShardNum
+       group := groupSchema.Metadata.Group
+       opts := wqueue.Opts[*tsTable, option]{
+               Group:           group,
+               ShardNum:        shardNum,
+               SegmentInterval: storage.MustToIntervalRule(ro.SegmentInterval),
+               Location:        path.Join(qs.path, group),
+               Option:          qs.option,
+               Metrics:         qs.newMetrics(p),
+               SubQueueCreator: newWriteQueue,
+               GetNodes: func(shardID common.ShardID) []string {
+                       copies := ro.Replicas + 1
+                       nodeSet := make(map[string]struct{}, copies)
+                       for i := uint32(0); i < copies; i++ {
+                               nodeID, err := 
qs.traceDataNodeRegistry.Locate(group, "", uint32(shardID), i)
+                               if err != nil {
+                                       qs.l.Error().Err(err).Str("group", 
group).Uint32("shard", uint32(shardID)).Uint32("copy", i).Msg("failed to locate 
node")
+                                       return nil
+                               }
+                               nodeSet[nodeID] = struct{}{}
+                       }
+                       nodes := make([]string, 0, len(nodeSet))
+                       for nodeID := range nodeSet {
+                               nodes = append(nodes, nodeID)
+                       }
+                       return nodes
+               },
+       }
+       return wqueue.Open(
+               common.SetPosition(context.Background(), func(_ 
common.Position) common.Position {
+                       return p
+               }),
+               opts, group,
+       )
 }
diff --git a/banyand/trace/part.go b/banyand/trace/part.go
index 0bd9d5cd..ee61e8c6 100644
--- a/banyand/trace/part.go
+++ b/banyand/trace/part.go
@@ -484,6 +484,10 @@ func (mp *memPart) mustFlush(fileSystem fs.FileSystem, 
path string) {
        mp.partMetadata.mustWriteMetadata(fileSystem, path)
        mp.tagType.mustWriteTagType(fileSystem, path)
        mp.traceIDFilter.mustWriteTraceIDFilter(fileSystem, path)
+       if mp.traceIDFilter.filter != nil {
+               releaseBloomFilter(mp.traceIDFilter.filter)
+               mp.traceIDFilter.filter = nil
+       }
 
        fileSystem.SyncPath(path)
 }
diff --git a/banyand/trace/query.go b/banyand/trace/query.go
index df81f15c..7be69cbe 100644
--- a/banyand/trace/query.go
+++ b/banyand/trace/query.go
@@ -24,8 +24,8 @@ import (
        "sort"
 
        "github.com/pkg/errors"
+       "google.golang.org/protobuf/types/known/timestamppb"
 
-       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/pkg/convert"
@@ -310,17 +310,6 @@ func (qr *queryResult) merge() *model.TraceResult {
        return result
 }
 
-func mustEncodeTagValue(name string, tagType databasev1.TagType, tagValue 
*modelv1.TagValue, num int) [][]byte {
-       values := make([][]byte, num)
-       tv := encodeTagValue(name, tagType, tagValue)
-       defer releaseTagValue(tv)
-       value := tv.marshal()
-       for i := 0; i < num; i++ {
-               values[i] = value
-       }
-       return values
-}
-
 func mustDecodeTagValue(valueType pbv1.ValueType, value []byte) 
*modelv1.TagValue {
        if value == nil {
                return pbv1.NullTagValue
@@ -351,6 +340,12 @@ func mustDecodeTagValue(valueType pbv1.ValueType, value 
[]byte) *modelv1.TagValu
                        values = append(values, string(bb.Buf))
                }
                return strArrTagValue(values)
+       case pbv1.ValueTypeTimestamp:
+               // Convert 64-bit nanoseconds since epoch back to protobuf 
timestamp
+               epochNanos := convert.BytesToInt64(value)
+               seconds := epochNanos / 1e9
+               nanos := int32(epochNanos % 1e9)
+               return timestampTagValue(seconds, nanos)
        default:
                logger.Panicf("unsupported value type: %v", valueType)
                return nil
@@ -406,3 +401,14 @@ func strArrTagValue(values []string) *modelv1.TagValue {
                },
        }
 }
+
+func timestampTagValue(seconds int64, nanos int32) *modelv1.TagValue {
+       return &modelv1.TagValue{
+               Value: &modelv1.TagValue_Timestamp{
+                       Timestamp: &timestamppb.Timestamp{
+                               Seconds: seconds,
+                               Nanos:   nanos,
+                       },
+               },
+       }
+}
diff --git a/banyand/trace/svc_standalone.go b/banyand/trace/svc_standalone.go
index a2c101bd..05e4e858 100644
--- a/banyand/trace/svc_standalone.go
+++ b/banyand/trace/svc_standalone.go
@@ -21,11 +21,15 @@ import (
        "context"
        "path"
        "path/filepath"
+       "strings"
 
        "github.com/pkg/errors"
 
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/api/data"
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/banyand/protector"
@@ -62,11 +66,6 @@ type standalone struct {
        maxFileSnapshotNum  int
 }
 
-// StandaloneService returns a new standalone service.
-func StandaloneService(_ context.Context) (Service, error) {
-       return &standalone{}, nil
-}
-
 func (s *standalone) FlagSet() *run.FlagSet {
        fs := run.NewFlagSet("trace")
        fs.StringVar(&s.root, "trace-root-path", "/tmp", "the root path for 
trace data")
@@ -93,41 +92,38 @@ func (s *standalone) Role() databasev1.Role {
        return databasev1.Role_ROLE_DATA
 }
 
-func (s *standalone) PreRun(_ context.Context) error {
-       s.l = logger.GetLogger("trace")
-
-       // Initialize metadata
-       if s.metadata == nil {
-               return errors.New("metadata repo is required")
-       }
-
-       // Initialize filesystem
-       if s.lfs == nil {
-               s.lfs = fs.NewLocalFileSystem()
-       }
-
-       // Initialize protector
-       if s.pm == nil {
-               return errors.New("memory protector is required")
+func (s *standalone) PreRun(ctx context.Context) error {
+       s.l = logger.GetLogger(s.Name())
+       s.l.Info().Msg("memory protector is initialized in PreRun")
+       s.lfs = fs.NewLocalFileSystemWithLoggerAndLimit(s.l, s.pm.GetLimit())
+       path := path.Join(s.root, s.Name())
+       s.snapshotDir = filepath.Join(path, storage.SnapshotsDir)
+       observability.UpdatePath(path)
+       val := ctx.Value(common.ContextNodeKey)
+       if val == nil {
+               return errors.New("node id is empty")
        }
-
-       // Initialize pipeline
-       if s.pipeline == nil {
-               return errors.New("pipeline is required")
-       }
-
-       // Set up data path
+       node := val.(common.Node)
        if s.dataPath == "" {
-               s.dataPath = path.Join(s.root, "trace-data")
+               s.dataPath = filepath.Join(path, storage.DataDir)
        }
-
-       // Initialize schema repository
-       var nodeLabels map[string]string
-       s.schemaRepo = newSchemaRepo(s.dataPath, s, nodeLabels)
+       if !strings.HasPrefix(filepath.VolumeName(s.dataPath), 
filepath.VolumeName(path)) {
+               observability.UpdatePath(s.dataPath)
+       }
+       s.schemaRepo = newSchemaRepo(s.dataPath, s, node.Labels)
 
        // Initialize snapshot directory
        s.snapshotDir = filepath.Join(s.dataPath, "snapshot")
 
+       // Set up write callback handler
+       if s.pipeline != nil {
+               writeListener := setUpWriteCallback(s.l, &s.schemaRepo, 
s.maxDiskUsagePercent)
+               err := s.pipeline.Subscribe(data.TopicTraceWrite, writeListener)
+               if err != nil {
+                       return err
+               }
+       }
+
        s.l.Info().
                Str("root", s.root).
                Str("dataPath", s.dataPath).
@@ -138,12 +134,7 @@ func (s *standalone) PreRun(_ context.Context) error {
 }
 
 func (s *standalone) Serve() run.StopNotify {
-       // As specified in the plan, no pipeline listeners should be implemented
-       s.l.Info().Msg("trace standalone service started")
-
-       // Return a channel that never closes since this service runs 
indefinitely
-       stopCh := make(chan struct{})
-       return stopCh
+       return s.schemaRepo.StopCh()
 }
 
 func (s *standalone) GracefulStop() {
@@ -169,36 +160,6 @@ func (s *standalone) GetRemovalSegmentsTimeRange(group 
string) *timestamp.TimeRa
        return s.schemaRepo.GetRemovalSegmentsTimeRange(group)
 }
 
-// SetMetadata sets the metadata repository.
-func (s *standalone) SetMetadata(metadata metadata.Repo) {
-       s.metadata = metadata
-}
-
-// SetObservabilityRegistry sets the observability metrics registry.
-func (s *standalone) SetObservabilityRegistry(omr 
observability.MetricsRegistry) {
-       s.omr = omr
-}
-
-// SetProtector sets the memory protector.
-func (s *standalone) SetProtector(pm protector.Memory) {
-       s.pm = pm
-}
-
-// SetPipeline sets the pipeline server.
-func (s *standalone) SetPipeline(pipeline queue.Server) {
-       s.pipeline = pipeline
-}
-
-// SetLocalPipeline sets the local pipeline queue.
-func (s *standalone) SetLocalPipeline(localPipeline queue.Queue) {
-       s.localPipeline = localPipeline
-}
-
-// SetFileSystem sets the file system.
-func (s *standalone) SetFileSystem(lfs fs.FileSystem) {
-       s.lfs = lfs
-}
-
 // NewService returns a new service.
 func NewService(metadata metadata.Repo, pipeline queue.Server, omr 
observability.MetricsRegistry, pm protector.Memory) (Service, error) {
        return &standalone{
diff --git a/banyand/trace/tag.go b/banyand/trace/tag.go
index 1510c997..bd576ece 100644
--- a/banyand/trace/tag.go
+++ b/banyand/trace/tag.go
@@ -18,70 +18,12 @@
 package trace
 
 import (
+       internalencoding 
"github.com/apache/skywalking-banyandb/banyand/internal/encoding"
        "github.com/apache/skywalking-banyandb/pkg/bytes"
-       "github.com/apache/skywalking-banyandb/pkg/convert"
-       "github.com/apache/skywalking-banyandb/pkg/encoding"
+       pkgencoding "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
-       "github.com/apache/skywalking-banyandb/pkg/pool"
-)
-
-func generateInt64Slice(length int) *[]int64 {
-       v := int64SlicePool.Get()
-       if v == nil {
-               s := make([]int64, length)
-               return &s
-       }
-       if cap(*v) < length {
-               *v = make([]int64, length)
-       } else {
-               *v = (*v)[:length]
-       }
-       return v
-}
-
-func releaseInt64Slice(int64Slice *[]int64) {
-       *int64Slice = (*int64Slice)[:0]
-       int64SlicePool.Put(int64Slice)
-}
-
-func generateFloat64Slice(length int) *[]float64 {
-       v := float64SlicePool.Get()
-       if v == nil {
-               s := make([]float64, length)
-               return &s
-       }
-       if cap(*v) < length {
-               *v = make([]float64, length)
-       } else {
-               *v = (*v)[:length]
-       }
-       return v
-}
-
-func releaseFloat64Slice(float64Slice *[]float64) {
-       *float64Slice = (*float64Slice)[:0]
-       float64SlicePool.Put(float64Slice)
-}
-
-func generateDictionary() *encoding.Dictionary {
-       v := dictionaryPool.Get()
-       if v == nil {
-               return &encoding.Dictionary{}
-       }
-       return v
-}
-
-func releaseDictionary(d *encoding.Dictionary) {
-       d.Reset()
-       dictionaryPool.Put(d)
-}
-
-var (
-       int64SlicePool   = pool.Register[*[]int64]("trace-int64Slice")
-       float64SlicePool = pool.Register[*[]float64]("trace-float64Slice")
-       dictionaryPool   = 
pool.Register[*encoding.Dictionary]("trace-dictionary")
 )
 
 type tag struct {
@@ -116,18 +58,14 @@ func (t *tag) mustWriteTo(tm *tagMetadata, tagWriter 
*writer) {
        tm.name = t.name
        tm.valueType = t.valueType
 
+       // Use shared encoding module
        bb := bigValuePool.Generate()
        defer bigValuePool.Release(bb)
-
-       // select encoding based on data type
-       switch t.valueType {
-       case pbv1.ValueTypeInt64:
-               t.encodeInt64Tag(bb)
-       case pbv1.ValueTypeFloat64:
-               t.encodeFloat64Tag(bb)
-       default:
-               t.encodeDefault(bb)
+       err := internalencoding.EncodeTagValues(bb, t.values, t.valueType)
+       if err != nil {
+               logger.Panicf("failed to encode tag values: %v", err)
        }
+
        tm.size = uint64(len(bb.Buf))
        if tm.size > maxValuesBlockSize {
                logger.Panicf("too large valuesSize: %d bytes; mustn't exceed 
%d bytes", tm.size, maxValuesBlockSize)
@@ -136,123 +74,37 @@ func (t *tag) mustWriteTo(tm *tagMetadata, tagWriter 
*writer) {
        tagWriter.MustWrite(bb.Buf)
 }
 
-func (t *tag) encodeInt64Tag(bb *bytes.Buffer) {
-       // convert byte array to int64 array
-       intValuesPtr := generateInt64Slice(len(t.values))
-       intValues := *intValuesPtr
-       defer releaseInt64Slice(intValuesPtr)
-       var encodeType encoding.EncodeType
-
-       for i, v := range t.values {
-               if v == nil || string(v) == "null" {
-                       t.encodeDefault(bb)
-                       encodeType = encoding.EncodeTypePlain
-                       // Prepend encodeType (1 byte) to the beginning
-                       bb.Buf = append([]byte{byte(encodeType)}, bb.Buf...)
-                       return
-               }
-               if len(v) != 8 {
-                       logger.Panicf("invalid value length at index %d: 
expected 8 bytes, got %d", i, len(v))
-               }
-               intValues[i] = convert.BytesToInt64(v)
-       }
-       // use delta encoding for integer column
-       var firstValue int64
-       bb.Buf, encodeType, firstValue = encoding.Int64ListToBytes(bb.Buf[:0], 
intValues)
-       if encodeType == encoding.EncodeTypeUnknown {
-               logger.Panicf("invalid encode type for int64 values")
-       }
-       firstValueBytes := convert.Int64ToBytes(firstValue)
-       // Prepend encodeType (1 byte) and firstValue (8 bytes) to the beginning
-       bb.Buf = append(
-               append([]byte{byte(encodeType)}, firstValueBytes...),
-               bb.Buf...,
-       )
-}
-
-func (t *tag) encodeFloat64Tag(bb *bytes.Buffer) {
-       // convert byte array to float64 array
-       intValuesPtr := generateInt64Slice(len(t.values))
-       intValues := *intValuesPtr
-       defer releaseInt64Slice(intValuesPtr)
-
-       floatValuesPtr := generateFloat64Slice(len(t.values))
-       floatValues := *floatValuesPtr
-       defer releaseFloat64Slice(floatValuesPtr)
-
-       var encodeType encoding.EncodeType
-
-       doEncodeDefault := func() {
-               t.encodeDefault(bb)
-               encodeType = encoding.EncodeTypePlain
-               // Prepend encodeType (1 byte) to the beginning
-               bb.Buf = append([]byte{byte(encodeType)}, bb.Buf...)
-       }
-
-       for i, v := range t.values {
-               if v == nil || string(v) == "null" {
-                       doEncodeDefault()
-                       return
-               }
-               if len(v) != 8 {
-                       logger.Panicf("invalid value length at index %d: 
expected 8 bytes, got %d", i, len(v))
-               }
-               floatValues[i] = convert.BytesToFloat64(v)
-       }
-       intValues, exp, err := 
encoding.Float64ListToDecimalIntList(intValues[:0], floatValues)
-       if err != nil {
-               logger.Errorf("cannot convert Float64List to DecimalIntList : 
%v", err)
-               doEncodeDefault()
-               return
-       }
-       var firstValue int64
-       bb.Buf, encodeType, firstValue = encoding.Int64ListToBytes(bb.Buf[:0], 
intValues)
-       if encodeType == encoding.EncodeTypeUnknown {
-               logger.Panicf("invalid encode type for int64 values")
-       }
-       firstValueBytes := convert.Int64ToBytes(firstValue)
-       expBytes := convert.Int16ToBytes(exp)
-       // Prepend encodeType (1 byte), exp (2 bytes) and firstValue (8 bytes) 
to the beginning
-       bb.Buf = append(
-               append(append([]byte{byte(encodeType)}, expBytes...), 
firstValueBytes...),
-               bb.Buf...,
-       )
-}
-
-func (t *tag) encodeDefault(bb *bytes.Buffer) {
-       dict := generateDictionary()
-       defer releaseDictionary(dict)
-       for _, v := range t.values {
-               if !dict.Add(v) {
-                       bb.Buf = encoding.EncodeBytesBlock(bb.Buf[:0], t.values)
-                       bb.Buf = append([]byte{byte(encoding.EncodeTypePlain)}, 
bb.Buf...)
-                       return
-               }
-       }
-       bb.Buf = dict.Encode(bb.Buf[:0])
-       bb.Buf = append([]byte{byte(encoding.EncodeTypeDictionary)}, bb.Buf...)
-}
-
-func (t *tag) mustReadValues(decoder *encoding.BytesBlockDecoder, reader 
fs.Reader, cm tagMetadata, count uint64) {
+func (t *tag) mustReadValues(decoder *pkgencoding.BytesBlockDecoder, reader 
fs.Reader, cm tagMetadata, count uint64) {
+       t.name = cm.name
+       t.valueType = cm.valueType
        if t.valueType == pbv1.ValueTypeUnknown {
-               for i := uint64(0); i < count; i++ {
+               for range count {
                        t.values = append(t.values, nil)
                }
                return
        }
 
-       bb := bigValuePool.Generate()
-       defer bigValuePool.Release(bb)
        valuesSize := cm.size
        if valuesSize > maxValuesBlockSize {
                logger.Panicf("%s: block size cannot exceed %d bytes; got %d 
bytes", reader.Path(), maxValuesBlockSize, valuesSize)
        }
-       bb.Buf = bytes.ResizeOver(bb.Buf, int(valuesSize))
+
+       bb := bigValuePool.Generate()
+       defer bigValuePool.Release(bb)
+       bb.Buf = bytes.ResizeOver(bb.Buf[:0], int(valuesSize))
        fs.MustReadData(reader, int64(cm.offset), bb.Buf)
-       t.decodeTagValues(decoder, reader.Path(), count, bb)
+
+       // Use shared decoding module
+       var err error
+       t.values, err = internalencoding.DecodeTagValues(t.values, decoder, bb, 
t.valueType, int(count))
+       if err != nil {
+               logger.Panicf("%s: failed to decode tag values: %v", 
reader.Path(), err)
+       }
 }
 
-func (t *tag) mustSeqReadValues(decoder *encoding.BytesBlockDecoder, reader 
*seqReader, cm tagMetadata, count uint64) {
+func (t *tag) mustSeqReadValues(decoder *pkgencoding.BytesBlockDecoder, reader 
*seqReader, cm tagMetadata, count uint64) {
+       t.name = cm.name
+       t.valueType = cm.valueType
        if cm.offset != reader.bytesRead {
                logger.Panicf("%s: offset mismatch: %d vs %d", reader.Path(), 
cm.offset, reader.bytesRead)
        }
@@ -263,114 +115,14 @@ func (t *tag) mustSeqReadValues(decoder 
*encoding.BytesBlockDecoder, reader *seq
 
        bb := bigValuePool.Generate()
        defer bigValuePool.Release(bb)
-
-       bb.Buf = bytes.ResizeOver(bb.Buf, int(valuesSize))
+       bb.Buf = bytes.ResizeOver(bb.Buf[:0], int(valuesSize))
        reader.mustReadFull(bb.Buf)
-       t.decodeTagValues(decoder, reader.Path(), count, bb)
-}
-
-func (t *tag) decodeTagValues(decoder *encoding.BytesBlockDecoder, path 
string, count uint64, bb *bytes.Buffer) {
-       switch t.valueType {
-       case pbv1.ValueTypeInt64:
-               t.decodeInt64Tag(decoder, path, count, bb)
-       case pbv1.ValueTypeFloat64:
-               t.decodeFloat64Tag(decoder, path, count, bb)
-       default:
-               t.decodeDefault(decoder, bb, count, path)
-       }
-}
-
-func (t *tag) decodeInt64Tag(decoder *encoding.BytesBlockDecoder, path string, 
count uint64, bb *bytes.Buffer) {
-       // decode integer type
-       intValuesPtr := generateInt64Slice(int(count))
-       intValues := *intValuesPtr
-       defer releaseInt64Slice(intValuesPtr)
-
-       if len(bb.Buf) < 1 {
-               logger.Panicf("bb.Buf length too short: expect at least %d 
bytes, but got %d bytes", 1, len(bb.Buf))
-       }
-       encodeType := encoding.EncodeType(bb.Buf[0])
-       if encodeType == encoding.EncodeTypePlain {
-               bb.Buf = bb.Buf[1:]
-               t.decodeDefault(decoder, bb, count, path)
-               return
-       }
-
-       const expectedLen = 9
-       if len(bb.Buf) < expectedLen {
-               logger.Panicf("bb.Buf length too short: expect at least %d 
bytes, but got %d bytes", expectedLen, len(bb.Buf))
-       }
-       firstValue := convert.BytesToInt64(bb.Buf[1:9])
-       bb.Buf = bb.Buf[9:]
-       var err error
-       intValues, err = encoding.BytesToInt64List(intValues[:0], bb.Buf, 
encodeType, firstValue, int(count))
-       if err != nil {
-               logger.Panicf("%s: cannot decode int values: %v", path, err)
-       }
-       // convert int64 array to byte array
-       t.values = make([][]byte, count)
-       for i, v := range intValues {
-               t.values[i] = convert.Int64ToBytes(v)
-       }
-}
-
-func (t *tag) decodeFloat64Tag(decoder *encoding.BytesBlockDecoder, path 
string, count uint64, bb *bytes.Buffer) {
-       // decode float type
-       intValuesPtr := generateInt64Slice(int(count))
-       intValues := *intValuesPtr
-       defer releaseInt64Slice(intValuesPtr)
-       floatValuesPtr := generateFloat64Slice(int(count))
-       floatValues := *floatValuesPtr
-       defer releaseFloat64Slice(floatValuesPtr)
-
-       if len(bb.Buf) < 1 {
-               logger.Panicf("bb.Buf length too short: expect at least %d 
bytes, but got %d bytes", 1, len(bb.Buf))
-       }
-       encodeType := encoding.EncodeType(bb.Buf[0])
-       if encodeType == encoding.EncodeTypePlain {
-               bb.Buf = bb.Buf[1:]
-               t.decodeDefault(decoder, bb, count, path)
-               return
-       }
 
-       const expectedLen = 11
-       if len(bb.Buf) < expectedLen {
-               logger.Panicf("bb.Buf length too short: expect at least %d 
bytes, but got %d bytes", expectedLen, len(bb.Buf))
-       }
-       exp := convert.BytesToInt16(bb.Buf[1:3])
-       firstValue := convert.BytesToInt64(bb.Buf[3:11])
-       bb.Buf = bb.Buf[11:]
+       // Use shared decoding module
        var err error
-       intValues, err = encoding.BytesToInt64List(intValues[:0], bb.Buf, 
encodeType, firstValue, int(count))
-       if err != nil {
-               logger.Panicf("%s: cannot decode int values: %v", path, err)
-       }
-       floatValues, err = 
encoding.DecimalIntListToFloat64List(floatValues[:0], intValues, exp, 
int(count))
-       if err != nil {
-               logger.Panicf("cannot convert DecimalIntList to Float64List: 
%v", err)
-       }
-       if uint64(len(floatValues)) != count {
-               logger.Panicf("unexpected floatValues length: got %d, expected 
%d", len(floatValues), count)
-       }
-       // convert float64 array to byte array
-       t.values = make([][]byte, count)
-       for i, v := range floatValues {
-               t.values[i] = convert.Float64ToBytes(v)
-       }
-}
-
-func (t *tag) decodeDefault(decoder *encoding.BytesBlockDecoder, bb 
*bytes.Buffer, count uint64, path string) {
-       encodeType := encoding.EncodeType(bb.Buf[0])
-       var err error
-       if encodeType == encoding.EncodeTypeDictionary {
-               dict := generateDictionary()
-               defer releaseDictionary(dict)
-               t.values, err = dict.Decode(t.values[:0], bb.Buf[1:], count)
-       } else {
-               t.values, err = decoder.Decode(t.values[:0], bb.Buf[1:], count)
-       }
+       t.values, err = internalencoding.DecodeTagValues(t.values, decoder, bb, 
t.valueType, int(count))
        if err != nil {
-               logger.Panicf("%s: cannot decode values: %v", path, err)
+               logger.Panicf("%s: failed to decode tag values: %v", 
reader.Path(), err)
        }
 }
 
diff --git a/banyand/trace/tag_test.go b/banyand/trace/tag_test.go
index 7d20eda5..52376c87 100644
--- a/banyand/trace/tag_test.go
+++ b/banyand/trace/tag_test.go
@@ -18,223 +18,158 @@
 package trace
 
 import (
-       "fmt"
        "testing"
+       "time"
 
        "github.com/stretchr/testify/assert"
 
-       "github.com/apache/skywalking-banyandb/pkg/bytes"
+       "github.com/apache/skywalking-banyandb/banyand/internal/encoding"
+       pkgbytes "github.com/apache/skywalking-banyandb/pkg/bytes"
        "github.com/apache/skywalking-banyandb/pkg/convert"
-       "github.com/apache/skywalking-banyandb/pkg/encoding"
+       pkgencoding "github.com/apache/skywalking-banyandb/pkg/encoding"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
 )
 
-func TestTag_reset(t *testing.T) {
-       tt := &tag{
-               name:      "test",
-               valueType: pbv1.ValueTypeStr,
-               values:    [][]byte{[]byte("value1"), []byte("value2")},
-       }
-
-       tt.reset()
-
-       assert.Equal(t, "", tt.name)
-       assert.Equal(t, 0, len(tt.values))
-}
-
-func TestTag_resizeValues(t *testing.T) {
-       tt := &tag{
-               values: make([][]byte, 2, 5),
-       }
-
-       values := tt.resizeValues(3)
-       assert.Equal(t, 3, len(values))
-       assert.Equal(t, 5, cap(values))
-
-       values = tt.resizeValues(6)
-       assert.Equal(t, 6, len(values))
-       assert.True(t, cap(values) >= 6) // The capacity is at least 6, but 
could be more
+func timeToBytes(t time.Time) []byte {
+       return pkgencoding.Int64ToBytes(nil, t.UnixNano())
 }
 
-func TestTag_mustWriteTo_mustReadValues(t *testing.T) {
-       tests := []struct {
-               tag  *tag
-               name string
-       }{
-               {
-                       name: "string with nils",
-                       tag: &tag{
-                               name:      "test",
-                               valueType: pbv1.ValueTypeStr,
-                               values:    [][]byte{[]byte("value1"), nil, 
[]byte("value2"), nil},
+func TestTagEncodingDecoding(t *testing.T) {
+       t.Run("test int64 tag encoding/decoding", func(t *testing.T) {
+               tag := &tag{
+                       name:      "test_int64",
+                       valueType: pbv1.ValueTypeInt64,
+                       values: [][]byte{
+                               convert.Int64ToBytes(100),
+                               convert.Int64ToBytes(200),
+                               convert.Int64ToBytes(300),
                        },
-               },
-               {
-                       name: "int64 with null",
-                       tag: &tag{
-                               name:      "test",
-                               valueType: pbv1.ValueTypeInt64,
-                               values:    [][]byte{[]byte("null"), nil, 
[]byte("null"), nil},
+               }
+
+               // Test encoding
+               bb := &pkgbytes.Buffer{}
+               err := encoding.EncodeTagValues(bb, tag.values, tag.valueType)
+               assert.NoError(t, err)
+               assert.NotNil(t, bb.Buf)
+               assert.Greater(t, len(bb.Buf), 0)
+
+               // Test decoding
+               decoder := &pkgencoding.BytesBlockDecoder{}
+               decodedValues, err := encoding.DecodeTagValues(nil, decoder, 
bb, tag.valueType, len(tag.values))
+               assert.NoError(t, err)
+               assert.Equal(t, len(tag.values), len(decodedValues))
+               assert.Equal(t, tag.values, decodedValues)
+       })
+
+       t.Run("test float64 tag encoding/decoding", func(t *testing.T) {
+               tag := &tag{
+                       name:      "test_float64",
+                       valueType: pbv1.ValueTypeFloat64,
+                       values: [][]byte{
+                               convert.Float64ToBytes(1.5),
+                               convert.Float64ToBytes(2.5),
+                               convert.Float64ToBytes(3.5),
                        },
-               },
-               {
-                       name: "valid int64 values",
-                       tag: &tag{
-                               name:      "test",
-                               valueType: pbv1.ValueTypeInt64,
-                               values: [][]byte{
-                                       convert.Int64ToBytes(1),
-                                       convert.Int64ToBytes(2),
-                                       convert.Int64ToBytes(4),
-                                       convert.Int64ToBytes(5),
-                               },
+               }
+
+               // Test encoding
+               bb := &pkgbytes.Buffer{}
+               err := encoding.EncodeTagValues(bb, tag.values, tag.valueType)
+               assert.NoError(t, err)
+               assert.NotNil(t, bb.Buf)
+               assert.Greater(t, len(bb.Buf), 0)
+
+               // Test decoding
+               decoder := &pkgencoding.BytesBlockDecoder{}
+               decodedValues, err := encoding.DecodeTagValues(nil, decoder, 
bb, tag.valueType, len(tag.values))
+               assert.NoError(t, err)
+               assert.Equal(t, len(tag.values), len(decodedValues))
+               assert.Equal(t, tag.values, decodedValues)
+       })
+
+       t.Run("test string tag encoding/decoding", func(t *testing.T) {
+               tag := &tag{
+                       name:      "test_string",
+                       valueType: pbv1.ValueTypeStr,
+                       values: [][]byte{
+                               []byte("value1"),
+                               []byte("value2"),
+                               []byte("value3"),
                        },
-               },
-       }
-
-       for _, tt := range tests {
-               t.Run(tt.name, func(t *testing.T) {
-                       tm := &tagMetadata{}
-
-                       buf, filterBuf := &bytes.Buffer{}, &bytes.Buffer{}
-                       w, fw := &writer{}, &writer{}
-                       w.init(buf)
-                       fw.init(filterBuf)
-
-                       tt.tag.mustWriteTo(tm, w)
-                       assert.Equal(t, w.bytesWritten, tm.size)
-                       assert.Equal(t, uint64(len(buf.Buf)), tm.size)
-                       assert.Equal(t, uint64(0), tm.offset)
-                       assert.Equal(t, tt.tag.name, tm.name)
-                       assert.Equal(t, tt.tag.valueType, tm.valueType)
-
-                       decoder := &encoding.BytesBlockDecoder{}
-                       unmarshaled := &tag{}
-                       unmarshaled.name = tm.name
-                       unmarshaled.valueType = tm.valueType
-                       unmarshaled.mustReadValues(decoder, buf, *tm, 
uint64(len(tt.tag.values)))
-                       assert.Equal(t, tt.tag.values, unmarshaled.values)
-               })
-       }
-}
-
-func TestTag_HighCardinalityStringEncoding(t *testing.T) {
-       tests := []struct {
-               name            string
-               description     string
-               expectedEncType encoding.EncodeType
-               uniqueCount     int
-               totalCount      int
-       }{
-               {
-                       name:            "exactly 256 unique values - should 
use dictionary",
-                       uniqueCount:     256,
-                       totalCount:      256,
-                       expectedEncType: encoding.EncodeTypeDictionary,
-                       description:     "Dictionary encoding should be used 
when exactly at the threshold",
-               },
-               {
-                       name:            "257 unique values - should use plain 
encoding",
-                       uniqueCount:     257,
-                       totalCount:      257,
-                       expectedEncType: encoding.EncodeTypePlain,
-                       description:     "Plain encoding should be used when 
exceeding dictionary threshold",
-               },
-               {
-                       name:            "300 unique values - should use plain 
encoding",
-                       uniqueCount:     300,
-                       totalCount:      300,
-                       expectedEncType: encoding.EncodeTypePlain,
-                       description:     "Plain encoding should be used for 
high cardinality strings",
-               },
-               {
-                       name:            "1000 unique values - should use plain 
encoding",
-                       uniqueCount:     1000,
-                       totalCount:      1000,
-                       expectedEncType: encoding.EncodeTypePlain,
-                       description:     "Plain encoding should be used for 
very high cardinality",
-               },
-               {
-                       name:            "500 total with 200 unique - should 
use dictionary",
-                       uniqueCount:     200,
-                       totalCount:      500,
-                       expectedEncType: encoding.EncodeTypeDictionary,
-                       description:     "Dictionary should be used when unique 
count is below threshold despite high total count",
-               },
-       }
-
-       for _, tt := range tests {
-               t.Run(tt.name, func(t *testing.T) {
-                       // Generate unique string values
-                       values := make([][]byte, tt.totalCount)
-
-                       // Create unique values up to uniqueCount
-                       for i := 0; i < tt.uniqueCount; i++ {
-                               values[i] = 
[]byte(fmt.Sprintf("unique_value_%06d", i))
-                       }
-
-                       // If totalCount > uniqueCount, repeat some values to 
reach totalCount
-                       for i := tt.uniqueCount; i < tt.totalCount; i++ {
-                               // Repeat values cyclically
-                               repeatIndex := i % tt.uniqueCount
-                               values[i] = 
[]byte(fmt.Sprintf("unique_value_%06d", repeatIndex))
-                       }
-
-                       testTag := &tag{
-                               name:      "high_cardinality_tag",
-                               valueType: pbv1.ValueTypeStr,
-                               values:    values,
-                       }
-
-                       // Encode the tag
-                       tm := &tagMetadata{}
-                       buf, filterBuf := &bytes.Buffer{}, &bytes.Buffer{}
-                       w, fw := &writer{}, &writer{}
-                       w.init(buf)
-                       fw.init(filterBuf)
-
-                       testTag.mustWriteTo(tm, w)
-
-                       // Verify basic metadata
-                       assert.Equal(t, w.bytesWritten, tm.size)
-                       assert.Equal(t, uint64(len(buf.Buf)), tm.size)
-                       assert.Equal(t, uint64(0), tm.offset)
-                       assert.Equal(t, testTag.name, tm.name)
-                       assert.Equal(t, testTag.valueType, tm.valueType)
-
-                       // Check encoding type by examining the first byte of 
the encoded data
-                       assert.True(t, len(buf.Buf) > 0, "Encoded buffer should 
not be empty")
-                       actualEncType := encoding.EncodeType(buf.Buf[0])
-                       assert.Equal(t, tt.expectedEncType, actualEncType,
-                               "Expected %s encoding (%d), got %d. %s",
-                               getEncodeTypeName(tt.expectedEncType), 
tt.expectedEncType, actualEncType, tt.description)
-
-                       // Test roundtrip: decode and verify all values are 
preserved
-                       decoder := &encoding.BytesBlockDecoder{}
-                       unmarshaled := &tag{}
-                       unmarshaled.name = tm.name
-                       unmarshaled.valueType = tm.valueType
-                       unmarshaled.mustReadValues(decoder, buf, *tm, 
uint64(len(testTag.values)))
-
-                       assert.Equal(t, len(testTag.values), 
len(unmarshaled.values), "Number of values should match")
-
-                       // Verify all values are correctly decoded
-                       for i, originalValue := range testTag.values {
-                               assert.Equal(t, originalValue, 
unmarshaled.values[i],
-                                       "Value at index %d should match 
original", i)
-                       }
-               })
-       }
-}
-
-// Helper function to get encode type name for better test output.
-func getEncodeTypeName(encType encoding.EncodeType) string {
-       switch encType {
-       case encoding.EncodeTypePlain:
-               return "Plain"
-       case encoding.EncodeTypeDictionary:
-               return "Dictionary"
-       default:
-               return fmt.Sprintf("Unknown(%d)", encType)
-       }
+               }
+
+               // Test encoding
+               bb := &pkgbytes.Buffer{}
+               err := encoding.EncodeTagValues(bb, tag.values, tag.valueType)
+               assert.NoError(t, err)
+               assert.NotNil(t, bb.Buf)
+               assert.Greater(t, len(bb.Buf), 0)
+
+               // Test decoding
+               decoder := &pkgencoding.BytesBlockDecoder{}
+               decodedValues, err := encoding.DecodeTagValues(nil, decoder, 
bb, tag.valueType, len(tag.values))
+               assert.NoError(t, err)
+               assert.Equal(t, len(tag.values), len(decodedValues))
+               assert.Equal(t, tag.values, decodedValues)
+       })
+
+       t.Run("test timestamp tag encoding/decoding", func(t *testing.T) {
+               // Create test timestamps
+               time1 := time.Date(2023, 1, 1, 12, 0, 0, 0, time.UTC)
+               time2 := time.Date(2023, 1, 2, 12, 0, 0, 0, time.UTC)
+               time3 := time.Date(2023, 1, 3, 12, 0, 0, 0, time.UTC)
+
+               tag := &tag{
+                       name:      "test_timestamp",
+                       valueType: pbv1.ValueTypeTimestamp,
+                       values: [][]byte{
+                               timeToBytes(time1),
+                               timeToBytes(time2),
+                               timeToBytes(time3),
+                       },
+               }
+
+               // Test encoding
+               bb := &pkgbytes.Buffer{}
+               err := encoding.EncodeTagValues(bb, tag.values, tag.valueType)
+               assert.NoError(t, err)
+               assert.NotNil(t, bb.Buf)
+               assert.Greater(t, len(bb.Buf), 0)
+
+               // Test decoding
+               decoder := &pkgencoding.BytesBlockDecoder{}
+               decodedValues, err := encoding.DecodeTagValues(nil, decoder, 
bb, tag.valueType, len(tag.values))
+               assert.NoError(t, err)
+               assert.Equal(t, len(tag.values), len(decodedValues))
+               assert.Equal(t, tag.values, decodedValues)
+
+               // Verify the decoded timestamp values can be converted back to 
time.Time
+               for i, decodedValue := range decodedValues {
+                       expectedTime := []time.Time{time1, time2, time3}[i]
+                       decodedNanos := pkgencoding.BytesToInt64(decodedValue)
+                       decodedTime := time.Unix(0, decodedNanos)
+                       assert.Equal(t, expectedTime.Unix(), decodedTime.Unix())
+                       // Note: We compare Unix time (seconds) since 
nanoseconds might have precision differences
+               }
+       })
+
+       t.Run("test empty values", func(t *testing.T) {
+               tag := &tag{
+                       name:      "test_empty",
+                       valueType: pbv1.ValueTypeStr,
+                       values:    [][]byte{},
+               }
+
+               // Test encoding
+               bb := &pkgbytes.Buffer{}
+               err := encoding.EncodeTagValues(bb, tag.values, tag.valueType)
+               assert.NoError(t, err)
+               assert.Nil(t, bb.Buf)
+
+               // Test decoding
+               decoder := &pkgencoding.BytesBlockDecoder{}
+               decodedValues, err := encoding.DecodeTagValues(nil, decoder, 
bb, tag.valueType, 0)
+               assert.NoError(t, err)
+               assert.Nil(t, decodedValues)
+       })
 }
diff --git a/banyand/trace/timestamp_test.go b/banyand/trace/timestamp_test.go
new file mode 100644
index 00000000..b35a7f33
--- /dev/null
+++ b/banyand/trace/timestamp_test.go
@@ -0,0 +1,123 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package trace
+
+import (
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/assert"
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+)
+
+func TestTimestampTagValueEncodingDecoding(t *testing.T) {
+       // Test timestamp encoding
+       t.Run("test timestamp tag encoding", func(t *testing.T) {
+               // Create a test timestamp
+               testTime := time.Date(2023, 1, 1, 12, 0, 0, 123456789, time.UTC)
+               timestampProto := timestamppb.New(testTime)
+
+               tagValue := &modelv1.TagValue{
+                       Value: &modelv1.TagValue_Timestamp{
+                               Timestamp: timestampProto,
+                       },
+               }
+
+               // Encode the timestamp
+               encoded := encodeTagValue("test_timestamp", 
databasev1.TagType_TAG_TYPE_TIMESTAMP, tagValue)
+
+               // Verify encoding
+               assert.Equal(t, pbv1.ValueTypeTimestamp, encoded.valueType)
+               assert.Equal(t, "test_timestamp", encoded.tag)
+               assert.NotNil(t, encoded.value)
+
+               // The encoded value should be 8 bytes (int64 nanoseconds)
+               assert.Equal(t, 8, len(encoded.value))
+
+               // Verify the encoded value represents the correct nanoseconds
+               expectedNanos := testTime.UnixNano()
+               // Use the same conversion function to decode
+               decodedNanos := convert.BytesToInt64(encoded.value)
+               assert.Equal(t, expectedNanos, decodedNanos)
+       })
+
+       // Test timestamp decoding
+       t.Run("test timestamp tag decoding", func(t *testing.T) {
+               // Create test timestamp
+               testTime := time.Date(2023, 1, 1, 12, 0, 0, 123456789, time.UTC)
+               expectedNanos := testTime.UnixNano()
+
+               // Create encoded bytes using the same conversion function
+               encodedBytes := convert.Int64ToBytes(expectedNanos)
+
+               // Decode the timestamp
+               decoded := mustDecodeTagValue(pbv1.ValueTypeTimestamp, 
encodedBytes)
+
+               // Verify decoding
+               assert.NotNil(t, decoded)
+               assert.NotNil(t, decoded.GetTimestamp())
+
+               decodedTime := decoded.GetTimestamp()
+               assert.Equal(t, testTime.Unix(), decodedTime.Seconds)
+               assert.Equal(t, int32(123456789), decodedTime.Nanos)
+       })
+
+       // Test round-trip encoding and decoding
+       t.Run("test timestamp round-trip", func(t *testing.T) {
+               // Create a test timestamp
+               testTime := time.Date(2023, 1, 1, 12, 0, 0, 123456789, time.UTC)
+               timestampProto := timestamppb.New(testTime)
+
+               tagValue := &modelv1.TagValue{
+                       Value: &modelv1.TagValue_Timestamp{
+                               Timestamp: timestampProto,
+                       },
+               }
+
+               // Encode
+               encoded := encodeTagValue("test_timestamp", 
databasev1.TagType_TAG_TYPE_TIMESTAMP, tagValue)
+
+               // Decode
+               decoded := mustDecodeTagValue(pbv1.ValueTypeTimestamp, 
encoded.value)
+
+               // Verify round-trip
+               assert.NotNil(t, decoded)
+               assert.NotNil(t, decoded.GetTimestamp())
+
+               decodedTime := decoded.GetTimestamp()
+               assert.Equal(t, testTime.Unix(), decodedTime.Seconds)
+               assert.Equal(t, int32(123456789), decodedTime.Nanos)
+       })
+
+       // Test nil timestamp handling
+       t.Run("test nil timestamp handling", func(t *testing.T) {
+               // Test encoding nil timestamp
+               encoded := encodeTagValue("test_timestamp", 
databasev1.TagType_TAG_TYPE_TIMESTAMP, nil)
+               assert.Equal(t, pbv1.ValueTypeTimestamp, encoded.valueType)
+               assert.Nil(t, encoded.value)
+
+               // Test decoding nil value
+               decoded := mustDecodeTagValue(pbv1.ValueTypeTimestamp, nil)
+               assert.Equal(t, pbv1.NullTagValue, decoded)
+       })
+}
diff --git a/banyand/trace/trace.go b/banyand/trace/trace.go
index b8812ee0..ed33c4f0 100644
--- a/banyand/trace/trace.go
+++ b/banyand/trace/trace.go
@@ -48,12 +48,11 @@ const (
 var traceScope = observability.RootScope.SubScope("trace")
 
 type option struct {
-       mergePolicy              *mergePolicy
-       protector                protector.Memory
-       tire2Client              queue.Client
-       seriesCacheMaxSize       run.Bytes
-       flushTimeout             time.Duration
-       elementIndexFlushTimeout time.Duration
+       mergePolicy        *mergePolicy
+       protector          protector.Memory
+       tire2Client        queue.Client
+       seriesCacheMaxSize run.Bytes
+       flushTimeout       time.Duration
 }
 
 // Service allows inspecting the trace data.
diff --git a/banyand/trace/trace_suite_test.go 
b/banyand/trace/trace_suite_test.go
index b64c502e..05140094 100644
--- a/banyand/trace/trace_suite_test.go
+++ b/banyand/trace/trace_suite_test.go
@@ -33,7 +33,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/test"
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
-       teststream "github.com/apache/skywalking-banyandb/pkg/test/stream"
+       testtrace "github.com/apache/skywalking-banyandb/pkg/test/trace"
 )
 
 func TestTrace(t *testing.T) {
@@ -57,7 +57,7 @@ func (p *preloadTraceService) Name() string {
 }
 
 func (p *preloadTraceService) PreRun(ctx context.Context) error {
-       return teststream.PreloadSchema(ctx, p.metaSvc.SchemaRegistry())
+       return testtrace.PreloadSchema(ctx, p.metaSvc.SchemaRegistry())
 }
 
 type services struct {
diff --git a/banyand/trace/traces.go b/banyand/trace/traces.go
index b166a9c3..93361ce2 100644
--- a/banyand/trace/traces.go
+++ b/banyand/trace/traces.go
@@ -157,7 +157,7 @@ func (t *traces) reset() {
        t.timestamps = t.timestamps[:0]
        for i := range t.tags {
                for j := range t.tags[i] {
-                       t.tags[i][j].reset()
+                       releaseTagValue(t.tags[i][j])
                }
        }
        t.tags = t.tags[:0]
diff --git a/banyand/trace/write_standalone.go 
b/banyand/trace/write_standalone.go
index 5825983e..1ed6f464 100644
--- a/banyand/trace/write_standalone.go
+++ b/banyand/trace/write_standalone.go
@@ -186,7 +186,7 @@ func processTraces(schemaRepo *schemaRepo, traces *traces, 
writeEvent *tracev1.I
        }
 
        is := stm.indexSchema.Load().(indexSchema)
-       if len(is.indexRuleLocators) != len(stm.GetSchema().GetTags()) {
+       if len(is.indexRuleLocators) > len(stm.GetSchema().GetTags()) {
                return fmt.Errorf("metadata crashed, tag rule length %d, tag 
length %d",
                        len(is.indexRuleLocators), 
len(stm.GetSchema().GetTags()))
        }
@@ -302,6 +302,14 @@ func encodeTagValue(name string, tagType 
databasev1.TagType, tagVal *modelv1.Tag
                for i := range tagVal.GetStrArray().Value {
                        tv.valueArr[i] = []byte(tagVal.GetStrArray().Value[i])
                }
+       case databasev1.TagType_TAG_TYPE_TIMESTAMP:
+               tv.valueType = pbv1.ValueTypeTimestamp
+               if tagVal.GetTimestamp() != nil {
+                       // Convert timestamp to 64-bit nanoseconds since epoch 
for efficient storage
+                       ts := tagVal.GetTimestamp()
+                       epochNanos := ts.Seconds*1e9 + int64(ts.Nanos)
+                       tv.value = convert.Int64ToBytes(epochNanos)
+               }
        default:
                logger.Panicf("unsupported tag value type: %T", 
tagVal.GetValue())
        }
diff --git a/pkg/cmdsetup/standalone.go b/pkg/cmdsetup/standalone.go
index 90b9293e..fb6c2028 100644
--- a/pkg/cmdsetup/standalone.go
+++ b/pkg/cmdsetup/standalone.go
@@ -35,6 +35,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/query"
        "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/banyand/stream"
+       "github.com/apache/skywalking-banyandb/banyand/trace"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/run"
        "github.com/apache/skywalking-banyandb/pkg/version"
@@ -58,6 +59,10 @@ func newStandaloneCmd(runners ...run.Unit) *cobra.Command {
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate stream service")
        }
+       traceSvc, err := trace.NewService(metaSvc, dataPipeline, metricSvc, pm)
+       if err != nil {
+               l.Fatal().Err(err).Msg("failed to initiate trace service")
+       }
        var srvMetrics *grpcprom.ServerMetrics
        srvMetrics.UnaryServerInterceptor()
        srvMetrics.UnaryServerInterceptor()
@@ -88,6 +93,7 @@ func newStandaloneCmd(runners ...run.Unit) *cobra.Command {
                propertySvc,
                measureSvc,
                streamSvc,
+               traceSvc,
                q,
                grpcServer,
                httpServer,
diff --git a/pkg/pb/v1/value.go b/pkg/pb/v1/value.go
index f13fc161..9d2eea50 100644
--- a/pkg/pb/v1/value.go
+++ b/pkg/pb/v1/value.go
@@ -23,6 +23,7 @@ import (
        "strconv"
 
        "github.com/pkg/errors"
+       "google.golang.org/protobuf/types/known/timestamppb"
 
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
@@ -43,6 +44,7 @@ const (
        ValueTypeBinaryData
        ValueTypeStrArr
        ValueTypeInt64Arr
+       ValueTypeTimestamp
 )
 
 // MustTagValueToValueType converts modelv1.TagValue to ValueType.
@@ -60,6 +62,8 @@ func MustTagValueToValueType(tag *modelv1.TagValue) ValueType 
{
                return ValueTypeStrArr
        case *modelv1.TagValue_IntArray:
                return ValueTypeInt64Arr
+       case *modelv1.TagValue_Timestamp:
+               return ValueTypeTimestamp
        default:
                panic("unknown tag value type")
        }
@@ -78,6 +82,8 @@ func MustTagValueSpecToValueType(tag databasev1.TagType) 
ValueType {
                return ValueTypeStrArr
        case databasev1.TagType_TAG_TYPE_INT_ARRAY:
                return ValueTypeInt64Arr
+       case databasev1.TagType_TAG_TYPE_TIMESTAMP:
+               return ValueTypeTimestamp
        default:
                panic("unknown tag value type")
        }
@@ -92,6 +98,8 @@ func MustTagValueToStr(tag *modelv1.TagValue) string {
                return strconv.FormatInt(tag.GetInt().Value, 10)
        case *modelv1.TagValue_BinaryData:
                return fmt.Sprintf("%x", tag.GetBinaryData())
+       case *modelv1.TagValue_Timestamp:
+               return tag.GetTimestamp().String()
        default:
                panic("unknown tag value type")
        }
@@ -135,6 +143,12 @@ func marshalTagValue(dest []byte, tv *modelv1.TagValue) 
([]byte, error) {
                dest = marshalEntityValue(dest, encoding.Int64ToBytes(nil, 
tv.GetInt().Value))
        case *modelv1.TagValue_BinaryData:
                dest = marshalEntityValue(dest, tv.GetBinaryData())
+       case *modelv1.TagValue_Timestamp:
+               // Convert timestamp to 64-bit nanoseconds since epoch for 
efficient storage
+               ts := tv.GetTimestamp()
+               epochNanos := ts.Seconds*1e9 + int64(ts.Nanos)
+               tsBytes := encoding.Int64ToBytes(nil, epochNanos)
+               dest = marshalEntityValue(dest, tsBytes)
        default:
                return nil, errors.New("unsupported tag value type: " + 
tv.String())
        }
@@ -198,6 +212,25 @@ func unmarshalTagValue(dest []byte, src []byte) ([]byte, 
[]byte, *modelv1.TagVal
                                BinaryData: data,
                        },
                }, nil
+       case ValueTypeTimestamp:
+               if dest, src, err = unmarshalEntityValue(dest, src[1:]); err != 
nil {
+                       return nil, nil, nil, errors.WithMessage(err, 
"unmarshal timestamp tag value")
+               }
+               // Unmarshal 64-bit epoch nanoseconds and convert back to 
seconds + nanos
+               if len(dest) < 8 { // Need at least 8 bytes for the 64-bit value
+                       return nil, src, nil, errors.New("insufficient bytes 
for timestamp")
+               }
+               epochNanos := encoding.BytesToInt64(dest)
+               seconds := epochNanos / 1e9
+               nanos := int32(epochNanos % 1e9)
+               return dest, src, &modelv1.TagValue{
+                       Value: &modelv1.TagValue_Timestamp{
+                               Timestamp: &timestamppb.Timestamp{
+                                       Seconds: seconds,
+                                       Nanos:   nanos,
+                               },
+                       },
+               }, nil
        default:
                return nil, src, nil, fmt.Errorf("unsupported tag value type 
%d, tag value: %s", vt, src)
        }
@@ -286,6 +319,15 @@ func MustCompareTagValue(tv1, tv2 *modelv1.TagValue) int {
                return int(tv1.GetInt().Value - tv2.GetInt().Value)
        case ValueTypeBinaryData:
                return bytes.Compare(tv1.GetBinaryData(), tv2.GetBinaryData())
+       case ValueTypeTimestamp:
+               ts1 := tv1.GetTimestamp()
+               ts2 := tv2.GetTimestamp()
+               if ts1.Seconds < ts2.Seconds {
+                       return -1
+               } else if ts1.Seconds > ts2.Seconds {
+                       return 1
+               }
+               return 0
        default:
                logger.Panicf("unsupported tag value type: %v", vt1)
                return 0
diff --git a/pkg/pb/v1/value_test.go b/pkg/pb/v1/value_test.go
index 710fb2cf..6eeae75d 100644
--- a/pkg/pb/v1/value_test.go
+++ b/pkg/pb/v1/value_test.go
@@ -21,6 +21,7 @@ import (
        "testing"
 
        "github.com/stretchr/testify/assert"
+       "google.golang.org/protobuf/types/known/timestamppb"
 
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
 )
@@ -42,6 +43,14 @@ func TestMarshalAndUnmarshalTagValue(t *testing.T) {
                        name: "binary data",
                        src:  &modelv1.TagValue{Value: 
&modelv1.TagValue_BinaryData{BinaryData: []byte("binaryData")}},
                },
+               {
+                       name: "timestamp value",
+                       src:  &modelv1.TagValue{Value: 
&modelv1.TagValue_Timestamp{Timestamp: &timestamppb.Timestamp{Seconds: 
1234567890, Nanos: 123456789}}},
+               },
+               {
+                       name: "timestamp value with high precision",
+                       src:  &modelv1.TagValue{Value: 
&modelv1.TagValue_Timestamp{Timestamp: &timestamppb.Timestamp{Seconds: 0, 
Nanos: 999999999}}},
+               },
                {
                        name: "unsupported type",
                        src:  &modelv1.TagValue{Value: 
&modelv1.TagValue_Null{}},
diff --git a/pkg/schema/init.go b/pkg/schema/init.go
index 8cbe0a12..5407b402 100644
--- a/pkg/schema/init.go
+++ b/pkg/schema/init.go
@@ -33,14 +33,15 @@ type revisionContext struct {
        group            int64
        measure          int64
        stream           int64
+       trace            int64
        indexRule        int64
        indexRuleBinding int64
        topNAgg          int64
 }
 
 func (r revisionContext) String() string {
-       return fmt.Sprintf("Group: %d, Measure: %d, Stream: %d, IndexRule: %d, 
IndexRuleBinding: %d, TopNAgg: %d",
-               r.group, r.measure, r.stream, r.indexRule, r.indexRuleBinding, 
r.topNAgg)
+       return fmt.Sprintf("Group: %d, Measure: %d, Stream: %d, Trace: %d, 
IndexRule: %d, IndexRuleBinding: %d, TopNAgg: %d",
+               r.group, r.measure, r.stream, r.trace, r.indexRule, 
r.indexRuleBinding, r.topNAgg)
 }
 
 type revisionContextKey struct{}
@@ -48,7 +49,7 @@ type revisionContextKey struct{}
 var revCtxKey = revisionContextKey{}
 
 func (sr *schemaRepo) Init(kind schema.Kind) ([]string, []int64) {
-       if kind != schema.KindMeasure && kind != schema.KindStream {
+       if kind != schema.KindMeasure && kind != schema.KindStream && kind != 
schema.KindTrace {
                return nil, nil
        }
        catalog := sr.getCatalog(kind)
@@ -74,6 +75,10 @@ func (sr *schemaRepo) Init(kind schema.Kind) ([]string, 
[]int64) {
                sr.l.Info().Stringer("revision", revCtx).Msg("init measures")
                return groupNames, []int64{revCtx.group, revCtx.measure, 
revCtx.indexRuleBinding, revCtx.indexRule, revCtx.topNAgg}
        }
+       if kind == schema.KindTrace {
+               sr.l.Info().Stringer("revision", revCtx).Msg("init trace")
+               return groupNames, []int64{revCtx.group, revCtx.trace, 
revCtx.indexRuleBinding, revCtx.indexRule}
+       }
        sr.l.Info().Stringer("revision", revCtx).Msg("init stream")
        return groupNames, []int64{revCtx.group, revCtx.stream, 
revCtx.indexRuleBinding, revCtx.indexRule}
 }
@@ -82,6 +87,9 @@ func (sr *schemaRepo) getCatalog(kind schema.Kind) 
commonv1.Catalog {
        if kind == schema.KindMeasure {
                return commonv1.Catalog_CATALOG_MEASURE
        }
+       if kind == schema.KindTrace {
+               return commonv1.Catalog_CATALOG_TRACE
+       }
        return commonv1.Catalog_CATALOG_STREAM
 }
 
@@ -96,6 +104,10 @@ func (sr *schemaRepo) processGroup(ctx context.Context, g 
*commonv1.Group, catal
                sr.processMeasure(ctx, g.Metadata.Name)
                return
        }
+       if catalog == commonv1.Catalog_CATALOG_TRACE {
+               sr.processTrace(ctx, g.Metadata.Name)
+               return
+       }
        sr.processStream(ctx, g.Metadata.Name)
 }
 
@@ -178,6 +190,27 @@ func (sr *schemaRepo) processStream(ctx context.Context, 
gName string) {
        sr.l.Info().Str("group", gName).Dur("duration", 
time.Since(start)).Int("size", len(ss)).Msg("store streams")
 }
 
+func (sr *schemaRepo) processTrace(ctx context.Context, gName string) {
+       ctx, cancel := context.WithTimeout(ctx, initTimeout)
+       defer cancel()
+       start := time.Now()
+       tt, err := sr.metadata.TraceRegistry().ListTrace(ctx, 
schema.ListOpt{Group: gName})
+       if err != nil {
+               logger.Panicf("fails to get the traces: %v", err)
+               return
+       }
+       revCtx := ctx.Value(revCtxKey).(*revisionContext)
+       for _, t := range tt {
+               if err := sr.storeResource(t); err != nil {
+                       logger.Panicf("fails to store the trace: %v", err)
+               }
+               if t.Metadata.ModRevision > revCtx.trace {
+                       revCtx.trace = t.Metadata.ModRevision
+               }
+       }
+       sr.l.Info().Str("group", gName).Dur("duration", 
time.Since(start)).Int("size", len(tt)).Msg("store traces")
+}
+
 func (sr *schemaRepo) initGroup(groupSchema *commonv1.Group) (*group, error) {
        g, ok := sr.getGroup(groupSchema.Metadata.Name)
        if ok {
diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go
index 55ac3940..b54d74a3 100644
--- a/pkg/test/setup/setup.go
+++ b/pkg/test/setup/setup.go
@@ -42,6 +42,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/test/helpers"
        test_measure "github.com/apache/skywalking-banyandb/pkg/test/measure"
        test_stream "github.com/apache/skywalking-banyandb/pkg/test/stream"
+       test_trace "github.com/apache/skywalking-banyandb/pkg/test/trace"
 )
 
 const host = "localhost"
@@ -51,6 +52,7 @@ func Standalone(flags ...string) (string, string, func()) {
        return StandaloneWithSchemaLoaders([]SchemaLoader{
                &preloadService{name: "stream"},
                &preloadService{name: "measure"},
+               &preloadService{name: "trace"},
        }, "", "", "", "", flags...)
 }
 
@@ -59,6 +61,7 @@ func StandaloneWithAuth(username, password string, flags 
...string) (string, str
        return StandaloneWithSchemaLoaders([]SchemaLoader{
                &preloadService{name: "stream"},
                &preloadService{name: "measure"},
+               &preloadService{name: "trace"},
        }, "", "", username, password, flags...)
 }
 
@@ -67,6 +70,7 @@ func StandaloneWithTLS(certFile, keyFile string, flags 
...string) (string, strin
        return StandaloneWithSchemaLoaders([]SchemaLoader{
                &preloadService{name: "stream"},
                &preloadService{name: "measure"},
+               &preloadService{name: "trace"},
        }, certFile, keyFile, "", "", flags...)
 }
 
@@ -99,6 +103,7 @@ func ClosableStandalone(path string, ports []int, flags 
...string) (string, stri
        return standaloneServer(path, ports, []SchemaLoader{
                &preloadService{name: "stream"},
                &preloadService{name: "measure"},
+               &preloadService{name: "trace"},
        }, "", "", flags...)
 }
 
@@ -134,6 +139,7 @@ func standaloneServerWithAuth(path string, ports []int, 
schemaLoaders []SchemaLo
                "--measure-root-path=" + path,
                "--metadata-root-path=" + path,
                "--property-root-path=" + path,
+               "--trace-root-path=" + path,
                fmt.Sprintf("--etcd-listen-client-url=%s", endpoint), 
fmt.Sprintf("--etcd-listen-peer-url=http://%s:%d";, host, ports[3]),
        }
        tlsEnabled := false
@@ -202,6 +208,9 @@ func (p *preloadService) PreRun(ctx context.Context) error {
        if p.name == "stream" {
                return test_stream.PreloadSchema(ctx, p.registry)
        }
+       if p.name == "trace" {
+               return test_trace.PreloadSchema(ctx, p.registry)
+       }
        return test_measure.PreloadSchema(ctx, p.registry)
 }
 
diff --git a/pkg/test/trace/etcd.go b/pkg/test/trace/etcd.go
new file mode 100644
index 00000000..275c1765
--- /dev/null
+++ b/pkg/test/trace/etcd.go
@@ -0,0 +1,109 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package trace implements helpers to load schemas for testing.
+package trace
+
+import (
+       "context"
+       "embed"
+       "path"
+
+       "github.com/pkg/errors"
+       "google.golang.org/protobuf/encoding/protojson"
+       "google.golang.org/protobuf/proto"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+)
+
+const (
+       groupDir            = "testdata/groups"
+       traceDir            = "testdata/traces"
+       indexRuleDir        = "testdata/index_rules"
+       indexRuleBindingDir = "testdata/index_rule_bindings"
+)
+
+//go:embed testdata/*
+var store embed.FS
+
+// PreloadSchema loads schemas from files in the booting process.
+func PreloadSchema(ctx context.Context, e schema.Registry) error {
+       return loadAllSchemas(ctx, e)
+}
+
+// loadAllSchemas loads all trace-related schemas from the testdata directory.
+func loadAllSchemas(ctx context.Context, e schema.Registry) error {
+       return preloadSchemaWithFuncs(ctx, e,
+               func(ctx context.Context, e schema.Registry) error {
+                       return loadSchema(groupDir, &commonv1.Group{}, 
func(group *commonv1.Group) error {
+                               return e.CreateGroup(ctx, group)
+                       })
+               },
+               func(ctx context.Context, e schema.Registry) error {
+                       return loadSchema(traceDir, &databasev1.Trace{}, 
func(trace *databasev1.Trace) error {
+                               _, innerErr := e.CreateTrace(ctx, trace)
+                               return innerErr
+                       })
+               },
+               func(ctx context.Context, e schema.Registry) error {
+                       return loadSchema(indexRuleDir, 
&databasev1.IndexRule{}, func(indexRule *databasev1.IndexRule) error {
+                               return e.CreateIndexRule(ctx, indexRule)
+                       })
+               },
+               func(ctx context.Context, e schema.Registry) error {
+                       return loadSchema(indexRuleBindingDir, 
&databasev1.IndexRuleBinding{}, func(indexRuleBinding 
*databasev1.IndexRuleBinding) error {
+                               return e.CreateIndexRuleBinding(ctx, 
indexRuleBinding)
+                       })
+               },
+       )
+}
+
+// preloadSchemaWithFuncs extracts the common logic for loading schemas.
+func preloadSchemaWithFuncs(ctx context.Context, e schema.Registry, loaders 
...func(context.Context, schema.Registry) error) error {
+       for _, loader := range loaders {
+               if err := loader(ctx, e); err != nil {
+                       return errors.WithStack(err)
+               }
+       }
+       return nil
+}
+
+func loadSchema[T proto.Message](dir string, resource T, loadFn func(resource 
T) error) error {
+       entries, err := store.ReadDir(dir)
+       if err != nil {
+               return err
+       }
+       for _, entry := range entries {
+               data, err := store.ReadFile(path.Join(dir, entry.Name()))
+               if err != nil {
+                       return err
+               }
+               resource.ProtoReflect().Descriptor().RequiredNumbers()
+               if err := protojson.Unmarshal(data, resource); err != nil {
+                       return err
+               }
+               if err := loadFn(resource); err != nil {
+                       if errors.Is(err, schema.ErrGRPCAlreadyExists) {
+                               return nil
+                       }
+                       return err
+               }
+       }
+       return nil
+}
diff --git a/pkg/test/trace/testdata/groups/test-trace-group.json 
b/pkg/test/trace/testdata/groups/test-trace-group.json
new file mode 100644
index 00000000..3d7bbaf2
--- /dev/null
+++ b/pkg/test/trace/testdata/groups/test-trace-group.json
@@ -0,0 +1,19 @@
+{
+    "metadata": {
+        "name": "test-trace-group"
+    },
+    "catalog": "CATALOG_TRACE",
+    "resource_opts": {
+        "shard_num": 2,
+        "replicas": 1,
+        "segment_interval": {
+            "unit": "UNIT_DAY",
+            "num": 1
+        },
+        "ttl": {
+            "unit": "UNIT_DAY",
+            "num": 3
+        }
+    },
+    "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/pkg/test/trace/testdata/index_rule_bindings/sw.json 
b/pkg/test/trace/testdata/index_rule_bindings/sw.json
new file mode 100644
index 00000000..7955985b
--- /dev/null
+++ b/pkg/test/trace/testdata/index_rule_bindings/sw.json
@@ -0,0 +1,17 @@
+{
+    "metadata": {
+        "name": "sw-index-rule-binding",
+        "group": "test-trace-group"
+    },
+    "rules": [
+        "duration",
+        "timestamp"
+    ],
+    "subject": {
+        "catalog": "CATALOG_TRACE",
+        "name": "sw"
+    },
+    "begin_at": "2021-04-15T01:30:15.01Z",
+    "expire_at": "2121-04-15T01:30:15.01Z",
+    "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/pkg/test/trace/testdata/index_rules/duration.json 
b/pkg/test/trace/testdata/index_rules/duration.json
new file mode 100644
index 00000000..c9cb393a
--- /dev/null
+++ b/pkg/test/trace/testdata/index_rules/duration.json
@@ -0,0 +1,14 @@
+{
+    "metadata": {
+        "name": "duration",
+        "group": "test-trace-group"
+    },
+    "tags": [
+        "service_id",
+        "service_instance_id",
+        "state",
+        "duration"
+    ],
+    "type": "TYPE_TREE",
+    "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/pkg/test/trace/testdata/index_rules/timestamp.json 
b/pkg/test/trace/testdata/index_rules/timestamp.json
new file mode 100644
index 00000000..be83e872
--- /dev/null
+++ b/pkg/test/trace/testdata/index_rules/timestamp.json
@@ -0,0 +1,14 @@
+{
+    "metadata": {
+        "name": "timestamp",
+        "group": "test-trace-group"
+    },
+    "tags": [
+        "service_id",
+        "service_instance_id",
+        "state",
+        "timestamp"
+    ],
+    "type": "TYPE_TREE",
+    "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/pkg/test/trace/testdata/traces/sw.json 
b/pkg/test/trace/testdata/traces/sw.json
new file mode 100644
index 00000000..37e21ed3
--- /dev/null
+++ b/pkg/test/trace/testdata/traces/sw.json
@@ -0,0 +1,39 @@
+{
+    "metadata": {
+        "name": "sw",
+        "group": "test-trace-group"
+    },
+    "tags": [
+        {
+            "name": "trace_id",
+            "type": "TAG_TYPE_STRING"
+        },
+        {
+            "name": "state",
+            "type": "TAG_TYPE_INT"
+        },
+        {
+            "name": "service_id",
+            "type": "TAG_TYPE_STRING"
+        },
+        {
+            "name": "service_instance_id",
+            "type": "TAG_TYPE_STRING"
+        },
+        {
+            "name": "endpoint_id",
+            "type": "TAG_TYPE_STRING"
+        },
+        {
+            "name": "duration",
+            "type": "TAG_TYPE_INT"
+        },
+        {
+            "name": "timestamp",
+            "type": "TAG_TYPE_TIMESTAMP"
+        }
+    ],
+    "trace_id_tag_name": "trace_id",
+    "timestamp_tag_name": "timestamp",
+    "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/test/cases/init.go b/test/cases/init.go
index 5e7d0543..9d152f99 100644
--- a/test/cases/init.go
+++ b/test/cases/init.go
@@ -60,4 +60,8 @@ func Initialize(addr string, now time.Time) {
        casesmeasuredata.Write(conn, "duplicated", "exception", 
"duplicated.json", now, 0)
        casesmeasuredata.Write(conn, "service_cpm_minute", "sw_updated", 
"service_cpm_minute_updated_data.json", now.Add(10*time.Minute), interval)
        time.Sleep(5 * time.Second)
+       // trace
+       // nolint:gocritic
+       // interval = 500 * time.Millisecond
+       // casestrace.Write(conn, "sw", now, interval)
 }
diff --git a/test/cases/trace/data/data.go b/test/cases/trace/data/data.go
new file mode 100644
index 00000000..fc70b23d
--- /dev/null
+++ b/test/cases/trace/data/data.go
@@ -0,0 +1,206 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package data is used to test the trace service.
+package data
+
+import (
+       "context"
+       "embed"
+       "encoding/base64"
+       "encoding/json"
+       "fmt"
+       "io"
+       "slices"
+       "strings"
+       "time"
+
+       "github.com/google/go-cmp/cmp"
+       g "github.com/onsi/ginkgo/v2"
+       gm "github.com/onsi/gomega"
+       grpclib "google.golang.org/grpc"
+       "google.golang.org/protobuf/encoding/protojson"
+       "google.golang.org/protobuf/testing/protocmp"
+       "google.golang.org/protobuf/types/known/timestamppb"
+       "sigs.k8s.io/yaml"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       tracev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+)
+
+//go:embed input/*.yml
+var inputFS embed.FS
+
+//go:embed want/*.yml
+var wantFS embed.FS
+
+//go:embed testdata/*.json
+var dataFS embed.FS
+
+// VerifyFn verify whether the query response matches the wanted result.
+var VerifyFn = func(innerGm gm.Gomega, sharedContext helpers.SharedContext, 
args helpers.Args) {
+       i, err := inputFS.ReadFile("input/" + args.Input + ".yml")
+       innerGm.Expect(err).NotTo(gm.HaveOccurred())
+       query := &tracev1.QueryRequest{}
+       helpers.UnmarshalYAML(i, query)
+       query.TimeRange = helpers.TimeRange(args, sharedContext)
+       query.Stages = args.Stages
+       c := tracev1.NewTraceServiceClient(sharedContext.Connection)
+       ctx := context.Background()
+       resp, err := c.Query(ctx, query)
+       if args.WantErr {
+               if err == nil {
+                       g.Fail("expect error")
+               }
+               return
+       }
+       innerGm.Expect(err).NotTo(gm.HaveOccurred(), query.String())
+       if args.WantEmpty {
+               innerGm.Expect(resp.Spans).To(gm.BeEmpty())
+               return
+       }
+       if args.Want == "" {
+               args.Want = args.Input
+       }
+       ww, err := wantFS.ReadFile("want/" + args.Want + ".yml")
+       innerGm.Expect(err).NotTo(gm.HaveOccurred())
+       want := &tracev1.QueryResponse{}
+       helpers.UnmarshalYAML(ww, want)
+       if args.DisOrder {
+               slices.SortFunc(want.Spans, func(a, b *tracev1.Span) int {
+                       // Sort by first tag value for consistency
+                       if len(a.Tags) > 0 && len(b.Tags) > 0 {
+                               return 
strings.Compare(a.Tags[0].Value.GetStr().GetValue(), 
b.Tags[0].Value.GetStr().GetValue())
+                       }
+                       return 0
+               })
+               slices.SortFunc(resp.Spans, func(a, b *tracev1.Span) int {
+                       if len(a.Tags) > 0 && len(b.Tags) > 0 {
+                               return 
strings.Compare(a.Tags[0].Value.GetStr().GetValue(), 
b.Tags[0].Value.GetStr().GetValue())
+                       }
+                       return 0
+               })
+       }
+       var extra []cmp.Option
+       extra = append(extra, protocmp.IgnoreUnknown(),
+               protocmp.Transform())
+       success := innerGm.Expect(cmp.Equal(resp, want,
+               extra...)).
+               To(gm.BeTrue(), func() string {
+                       var j []byte
+                       j, err = protojson.Marshal(resp)
+                       if err != nil {
+                               return err.Error()
+                       }
+                       var y []byte
+                       y, err = yaml.JSONToYAML(j)
+                       if err != nil {
+                               return err.Error()
+                       }
+                       return string(y)
+               })
+       if !success {
+               return
+       }
+       query.Trace = true
+       resp, err = c.Query(ctx, query)
+       innerGm.Expect(err).NotTo(gm.HaveOccurred())
+       innerGm.Expect(resp.TraceQueryResult).NotTo(gm.BeNil())
+       innerGm.Expect(resp.TraceQueryResult.GetSpans()).NotTo(gm.BeEmpty())
+}
+
+func loadData(stream tracev1.TraceService_WriteClient, metadata 
*commonv1.Metadata, dataFile string, baseTime time.Time, interval 
time.Duration) {
+       var templates []interface{}
+       content, err := dataFS.ReadFile("testdata/" + dataFile)
+       gm.Expect(err).ShouldNot(gm.HaveOccurred())
+       gm.Expect(json.Unmarshal(content, 
&templates)).ShouldNot(gm.HaveOccurred())
+
+       for i, template := range templates {
+               // Extract span data from template
+               templateMap, ok := template.(map[string]interface{})
+               gm.Expect(ok).To(gm.BeTrue())
+
+               // Get span data
+               spanData, ok := templateMap["span"].(string)
+               gm.Expect(ok).To(gm.BeTrue())
+               spanBytes, err := base64.StdEncoding.DecodeString(spanData)
+               gm.Expect(err).ShouldNot(gm.HaveOccurred())
+
+               // Get tags data
+               tagsData, ok := templateMap["tags"].([]interface{})
+               gm.Expect(ok).To(gm.BeTrue())
+
+               // Convert tags to TagValue format
+               var tagValues []*modelv1.TagValue
+               for _, tag := range tagsData {
+                       tagBytes, err := json.Marshal(tag)
+                       gm.Expect(err).ShouldNot(gm.HaveOccurred())
+                       tagValue := &modelv1.TagValue{}
+                       gm.Expect(protojson.Unmarshal(tagBytes, 
tagValue)).ShouldNot(gm.HaveOccurred())
+                       tagValues = append(tagValues, tagValue)
+               }
+
+               // Add timestamp tag as the last tag
+               timestamp := baseTime.Add(interval * time.Duration(i))
+               timestampTag := &modelv1.TagValue{
+                       Value: &modelv1.TagValue_Timestamp{
+                               Timestamp: timestamppb.New(timestamp),
+                       },
+               }
+               tagValues = append(tagValues, timestampTag)
+
+               errInner := stream.Send(&tracev1.WriteRequest{
+                       Metadata: metadata,
+                       Tags:     tagValues,
+                       Span:     spanBytes,
+                       Version:  uint64(i + 1),
+               })
+               gm.Expect(errInner).ShouldNot(gm.HaveOccurred())
+       }
+}
+
+// Write writes trace data to the database.
+func Write(conn *grpclib.ClientConn, name string, baseTime time.Time, interval 
time.Duration) {
+       WriteToGroup(conn, name, "test-trace-group", name, baseTime, interval)
+}
+
+// WriteToGroup writes trace data to a specific group.
+func WriteToGroup(conn *grpclib.ClientConn, name, group, fileName string, 
baseTime time.Time, interval time.Duration) {
+       metadata := &commonv1.Metadata{
+               Name:  name,
+               Group: group,
+       }
+       schema := databasev1.NewTraceRegistryServiceClient(conn)
+       resp, err := schema.Get(context.Background(), 
&databasev1.TraceRegistryServiceGetRequest{Metadata: metadata})
+       gm.Expect(err).NotTo(gm.HaveOccurred())
+       metadata = resp.GetTrace().GetMetadata()
+
+       c := tracev1.NewTraceServiceClient(conn)
+       ctx := context.Background()
+       writeClient, err := c.Write(ctx)
+       gm.Expect(err).NotTo(gm.HaveOccurred())
+       loadData(writeClient, metadata, fmt.Sprintf("%s.json", fileName), 
baseTime, interval)
+       gm.Expect(writeClient.CloseSend()).To(gm.Succeed())
+       gm.Eventually(func() error {
+               _, err := writeClient.Recv()
+               return err
+       }, flags.EventuallyTimeout).Should(gm.Equal(io.EOF))
+}
diff --git a/test/cases/trace/data/input/all.yml 
b/test/cases/trace/data/input/all.yml
new file mode 100644
index 00000000..8a295c06
--- /dev/null
+++ b/test/cases/trace/data/input/all.yml
@@ -0,0 +1,20 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "sw"
+groups: ["test-trace-group"]
+tag_projection: ["trace_id"]
diff --git a/test/cases/trace/data/testdata/sw.json 
b/test/cases/trace/data/testdata/sw.json
new file mode 100644
index 00000000..7213854a
--- /dev/null
+++ b/test/cases/trace/data/testdata/sw.json
@@ -0,0 +1,177 @@
+[
+    {
+        "tags": [
+            {
+                "str": {
+                    "value": "1"
+                }
+            },
+            {
+                "int": {
+                    "value": 1
+                }
+            },
+            {
+                "str": {
+                    "value": "webapp_service"
+                }
+            },
+            {
+                "str": {
+                    "value": "webapp_instance_1"
+                }
+            },
+            {
+                "str": {
+                    "value": "/home_endpoint"
+                }
+            },
+            {
+                "int": {
+                    "value": 1000
+                }
+            }
+        ],
+        "span": "YWJjMTIzIT8kKiYoKSctPUB+"
+    },
+    {
+        "tags": [
+            {
+                "str": {
+                    "value": "2"
+                }
+            },
+            {
+                "int": {
+                    "value": 1
+                }
+            },
+            {
+                "str": {
+                    "value": "webapp_service"
+                }
+            },
+            {
+                "str": {
+                    "value": "webapp_instance_2"
+                }
+            },
+            {
+                "str": {
+                    "value": "/product_endpoint"
+                }
+            },
+            {
+                "int": {
+                    "value": 500
+                }
+            }
+        ],
+        "span": "YWJjMTIzIT8kKiYoKSctPUB+"
+    },
+    {
+        "tags": [
+            {
+                "str": {
+                    "value": "3"
+                }
+            },
+            {
+                "int": {
+                    "value": 0
+                }
+            },
+            {
+                "str": {
+                    "value": "webapp_service"
+                }
+            },
+            {
+                "str": {
+                    "value": "webapp_instance_1"
+                }
+            },
+            {
+                "str": {
+                    "value": "/home_endpoint"
+                }
+            },
+            {
+                "int": {
+                    "value": 30
+                }
+            }
+        ],
+        "span": "YWJjMTIzIT8kKiYoKSctPUB+"
+    },
+    {
+        "tags": [
+            {
+                "str": {
+                    "value": "4"
+                }
+            },
+            {
+                "int": {
+                    "value": 0
+                }
+            },
+            {
+                "str": {
+                    "value": "webapp_service"
+                }
+            },
+            {
+                "str": {
+                    "value": "webapp_instance_3"
+                }
+            },
+            {
+                "str": {
+                    "value": "/price_endpoint"
+                }
+            },
+            {
+                "int": {
+                    "value": 60
+                }
+            }
+        ],
+        "span": "YWJjMTIzIT8kKiYoKSctPUB+"
+    },
+    {
+        "tags": [
+            {
+                "str": {
+                    "value": "5"
+                }
+            },
+            {
+                "int": {
+                    "value": 0
+                }
+            },
+            {
+                "str": {
+                    "value": "webapp_service"
+                }
+            },
+            {
+                "str": {
+                    "value": "webapp_instance_1"
+                }
+            },
+            {
+                "str": {
+                    "value": "/item_endpoint"
+                }
+            },
+            {
+                "int": {
+                    "value": 300
+                }
+            }
+        ],
+        "span": "YWJjMTIzIT8kKiYoKSctPUB+"
+    }
+]
\ No newline at end of file
diff --git a/test/cases/trace/data/want/all.yml 
b/test/cases/trace/data/want/all.yml
new file mode 100644
index 00000000..e29fbaf0
--- /dev/null
+++ b/test/cases/trace/data/want/all.yml
@@ -0,0 +1,48 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+spans:
+  - tags:
+    - key: trace_id
+      value:
+        str:
+          value: "1"
+    span: "YWJjMTIzIT8kKiYoKSctPUB+"
+  - tags:
+    - key: trace_id
+      value:
+        str:
+          value: "2"
+    span: "YWJjMTIzIT8kKiYoKSctPUB+"
+  - tags:
+    - key: trace_id
+      value:
+        str:
+          value: "3"
+    span: "YWJjMTIzIT8kKiYoKSctPUB+"
+  - tags:
+    - key: trace_id
+      value:
+        str:
+          value: "4"
+    span: "YWJjMTIzIT8kKiYoKSctPUB+"
+  - tags:
+    - key: trace_id
+      value:
+        str:
+          value: "5"
+    span: "YWJjMTIzIT8kKiYoKSctPUB+"
diff --git a/test/cases/trace/trace.go b/test/cases/trace/trace.go
new file mode 100644
index 00000000..dd7eb4db
--- /dev/null
+++ b/test/cases/trace/trace.go
@@ -0,0 +1,46 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package trace_test contains integration test cases of the trace.
+package trace_test
+
+import (
+       "time"
+
+       g "github.com/onsi/ginkgo/v2"
+       gm "github.com/onsi/gomega"
+
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+       trace_test_data 
"github.com/apache/skywalking-banyandb/test/cases/trace/data"
+)
+
+var (
+       // SharedContext is the parallel execution context.
+       SharedContext helpers.SharedContext
+       verify        = func(innerGm gm.Gomega, args helpers.Args) {
+               trace_test_data.VerifyFn(innerGm, SharedContext, args)
+       }
+)
+
+var _ = g.FDescribeTable("Scanning Traces", func(args helpers.Args) {
+       gm.Eventually(func(innerGm gm.Gomega) {
+               verify(innerGm, args)
+       }, flags.EventuallyTimeout).Should(gm.Succeed())
+},
+       g.Entry("all elements", helpers.Args{Input: "all", Duration: 1 * 
time.Hour}),
+)
diff --git a/test/integration/standalone/query/query_suite_test.go 
b/test/integration/standalone/query/query_suite_test.go
index 7c576550..f760710a 100644
--- a/test/integration/standalone/query/query_suite_test.go
+++ b/test/integration/standalone/query/query_suite_test.go
@@ -82,6 +82,10 @@ var _ = SynchronizedBeforeSuite(func() []byte {
                Connection: connection,
                BaseTime:   now,
        }
+       // casestrace.SharedContext = helpers.SharedContext{
+       //      Connection: connection,
+       //      BaseTime:   now,
+       // }
        Expect(err).NotTo(HaveOccurred())
 })
 


Reply via email to