This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch sidx/element
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 865433c332a028ba4506cacd2a2fbbd4abd02ad8
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Sat Aug 16 08:52:54 2025 +0800

    Add tag handling functionality with comprehensive tests
    
    - Introduced `tag.go` and `tag_test.go` files to implement and test tag 
metadata and data structures.
---
 .../tag.go => internal/encoding/tag_encoder.go}    | 368 +++++------
 banyand/internal/sidx/TODO.md                      |  25 +-
 banyand/internal/sidx/element.go                   |   3 +
 banyand/internal/sidx/element_test.go              |  10 +-
 banyand/internal/sidx/tag.go                       | 454 +++++++++++++
 banyand/internal/sidx/tag_test.go                  | 724 +++++++++++++++++++++
 banyand/stream/tag.go                              | 317 +--------
 banyand/stream/tag_test.go                         |  19 +-
 8 files changed, 1406 insertions(+), 514 deletions(-)

diff --git a/banyand/stream/tag.go b/banyand/internal/encoding/tag_encoder.go
similarity index 50%
copy from banyand/stream/tag.go
copy to banyand/internal/encoding/tag_encoder.go
index 4ad42dd4..fe5160a0 100644
--- a/banyand/stream/tag.go
+++ b/banyand/internal/encoding/tag_encoder.go
@@ -6,7 +6,7 @@
 // not use this file except in compliance with the License.
 // You may obtain a copy of the License at
 //
-//     http://www.apache.org/licenses/LICENSE-2.0
+//     http://www.apache.org/licenses/LICENSE-2.0
 //
 // Unless required by applicable law or agreed to in writing,
 // software distributed under the License is distributed on an
@@ -15,18 +15,32 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package stream
+// Package encoding provides tag value encoding functionality with optimal 
compression
+// for different data types including int64, float64, and other types using 
dictionary
+// encoding with fallback to plain encoding with zstd compression.
+package encoding
 
 import (
        "github.com/apache/skywalking-banyandb/pkg/bytes"
+       "github.com/apache/skywalking-banyandb/pkg/compress/zstd"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/encoding"
-       "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/pool"
 )
 
+const (
+       defaultCompressionLevel = 3
+)
+
+var (
+       int64SlicePool   = pool.Register[*[]int64]("tag-encoder-int64Slice")
+       float64SlicePool = pool.Register[*[]float64]("tag-encoder-float64Slice")
+       dictionaryPool   = 
pool.Register[*encoding.Dictionary]("tag-encoder-dictionary")
+       bigValuePool     = bytes.NewBufferPool("tag-encoder-big-value")
+)
+
 func generateInt64Slice(length int) *[]int64 {
        v := int64SlicePool.Get()
        if v == nil {
@@ -78,148 +92,124 @@ func releaseDictionary(d *encoding.Dictionary) {
        dictionaryPool.Put(d)
 }
 
-var (
-       int64SlicePool   = pool.Register[*[]int64]("stream-int64Slice")
-       float64SlicePool = pool.Register[*[]float64]("stream-float64Slice")
-       dictionaryPool   = 
pool.Register[*encoding.Dictionary]("stream-dictionary")
-)
-
-type tag struct {
-       tagFilter
-       name      string
-       values    [][]byte
-       valueType pbv1.ValueType
-}
-
-func (t *tag) reset() {
-       t.name = ""
-
-       values := t.values
-       for i := range values {
-               values[i] = nil
+// EncodeTagValues encodes tag values based on the value type with optimal 
compression.
+// For int64: uses delta encoding with first value storage.
+// For float64: converts to decimal integers with exponent, then delta 
encoding.
+// For other types: uses dictionary encoding, falls back to plain with zstd 
compression.
+func EncodeTagValues(values [][]byte, valueType pbv1.ValueType) ([]byte, 
error) {
+       if len(values) == 0 {
+               return nil, nil
        }
-       t.values = values[:0]
 
-       t.tagFilter.reset()
-}
+       bb := bigValuePool.Generate()
+       defer bigValuePool.Release(bb)
 
-func (t *tag) resizeValues(valuesLen int) [][]byte {
-       values := t.values
-       if n := valuesLen - cap(values); n > 0 {
-               values = append(values[:cap(values)], make([][]byte, n)...)
+       switch valueType {
+       case pbv1.ValueTypeInt64:
+               return encodeInt64TagValues(bb, values)
+       case pbv1.ValueTypeFloat64:
+               return encodeFloat64TagValues(bb, values)
+       default:
+               return encodeDefaultTagValues(bb, values)
        }
-       values = values[:valuesLen]
-       t.values = values
-       return values
 }
 
-func (t *tag) mustWriteTo(tm *tagMetadata, tagWriter *writer, tagFilterWriter 
*writer) {
-       tm.reset()
-
-       tm.name = t.name
-       tm.valueType = t.valueType
+// DecodeTagValues decodes tag values based on the value type.
+func DecodeTagValues(data []byte, valueType pbv1.ValueType, count int) 
([][]byte, error) {
+       if len(data) == 0 {
+               return nil, nil
+       }
 
        bb := bigValuePool.Generate()
        defer bigValuePool.Release(bb)
+       bb.Buf = append(bb.Buf[:0], data...)
+
+       decoder := &encoding.BytesBlockDecoder{}
+       values := make([][]byte, 0, count)
 
-       // select encoding based on data type
-       switch t.valueType {
+       switch valueType {
        case pbv1.ValueTypeInt64:
-               t.encodeInt64Tag(bb)
+               return decodeInt64TagValues(decoder, bb, uint64(count))
        case pbv1.ValueTypeFloat64:
-               t.encodeFloat64Tag(bb)
+               return decodeFloat64TagValues(decoder, bb, uint64(count))
        default:
-               t.encodeDefault(bb)
-       }
-       tm.size = uint64(len(bb.Buf))
-       if tm.size > maxValuesBlockSize {
-               logger.Panicf("too large valuesSize: %d bytes; mustn't exceed 
%d bytes", tm.size, maxValuesBlockSize)
-       }
-       tm.offset = tagWriter.bytesWritten
-       tagWriter.MustWrite(bb.Buf)
-
-       if t.filter != nil {
-               bb.Reset()
-               bb.Buf = encodeBloomFilter(bb.Buf[:0], t.filter)
-               if tm.valueType == pbv1.ValueTypeInt64 {
-                       tm.min = t.min
-                       tm.max = t.max
-               }
-               tm.filterBlock.size = uint64(len(bb.Buf))
-               tm.filterBlock.offset = tagFilterWriter.bytesWritten
-               tagFilterWriter.MustWrite(bb.Buf)
+               return decodeDefaultTagValues(decoder, bb, uint64(count), 
values)
        }
 }
 
-func (t *tag) encodeInt64Tag(bb *bytes.Buffer) {
-       // convert byte array to int64 array
-       intValuesPtr := generateInt64Slice(len(t.values))
+func encodeInt64TagValues(bb *bytes.Buffer, values [][]byte) ([]byte, error) {
+       intValuesPtr := generateInt64Slice(len(values))
        intValues := *intValuesPtr
        defer releaseInt64Slice(intValuesPtr)
        var encodeType encoding.EncodeType
 
-       for i, v := range t.values {
+       for i, v := range values {
                if v == nil || string(v) == "null" {
-                       t.encodeDefault(bb)
-                       encodeType = encoding.EncodeTypePlain
-                       // Prepend encodeType (1 byte) to the beginning
-                       bb.Buf = append([]byte{byte(encodeType)}, bb.Buf...)
-                       return
+                       // Handle null values by falling back to default 
encoding
+                       bb.Buf = encoding.EncodeBytesBlock(bb.Buf[:0], values)
+                       bb.Buf = append([]byte{byte(encoding.EncodeTypePlain)}, 
bb.Buf...)
+                       // Apply zstd compression for plain encoding
+                       compressed := zstd.Compress(nil, bb.Buf, 
defaultCompressionLevel)
+                       return compressed, nil
                }
                if len(v) != 8 {
                        logger.Panicf("invalid value length at index %d: 
expected 8 bytes, got %d", i, len(v))
                }
                intValues[i] = convert.BytesToInt64(v)
        }
-       // use delta encoding for integer column
+
        var firstValue int64
        bb.Buf, encodeType, firstValue = encoding.Int64ListToBytes(bb.Buf[:0], 
intValues)
        if encodeType == encoding.EncodeTypeUnknown {
                logger.Panicf("invalid encode type for int64 values")
        }
        firstValueBytes := convert.Int64ToBytes(firstValue)
+
        // Prepend encodeType (1 byte) and firstValue (8 bytes) to the beginning
-       bb.Buf = append(
+       result := append(
                append([]byte{byte(encodeType)}, firstValueBytes...),
                bb.Buf...,
        )
+       return result, nil
 }
 
-func (t *tag) encodeFloat64Tag(bb *bytes.Buffer) {
-       // convert byte array to float64 array
-       intValuesPtr := generateInt64Slice(len(t.values))
+func encodeFloat64TagValues(bb *bytes.Buffer, values [][]byte) ([]byte, error) 
{
+       intValuesPtr := generateInt64Slice(len(values))
        intValues := *intValuesPtr
        defer releaseInt64Slice(intValuesPtr)
 
-       floatValuesPtr := generateFloat64Slice(len(t.values))
+       floatValuesPtr := generateFloat64Slice(len(values))
        floatValues := *floatValuesPtr
        defer releaseFloat64Slice(floatValuesPtr)
 
        var encodeType encoding.EncodeType
 
-       doEncodeDefault := func() {
-               t.encodeDefault(bb)
-               encodeType = encoding.EncodeTypePlain
-               // Prepend encodeType (1 byte) to the beginning
-               bb.Buf = append([]byte{byte(encodeType)}, bb.Buf...)
-       }
-
-       for i, v := range t.values {
+       for i, v := range values {
                if v == nil || string(v) == "null" {
-                       doEncodeDefault()
-                       return
+                       // Handle null values by falling back to default 
encoding
+                       bb.Buf = encoding.EncodeBytesBlock(bb.Buf[:0], values)
+                       bb.Buf = append([]byte{byte(encoding.EncodeTypePlain)}, 
bb.Buf...)
+                       // Apply zstd compression for plain encoding
+                       compressed := zstd.Compress(nil, bb.Buf, 
defaultCompressionLevel)
+                       return compressed, nil
                }
                if len(v) != 8 {
                        logger.Panicf("invalid value length at index %d: 
expected 8 bytes, got %d", i, len(v))
                }
                floatValues[i] = convert.BytesToFloat64(v)
        }
+
        intValues, exp, err := 
encoding.Float64ListToDecimalIntList(intValues[:0], floatValues)
        if err != nil {
-               logger.Errorf("cannot convert Float64List to DecimalIntList : 
%v", err)
-               doEncodeDefault()
-               return
+               logger.Errorf("cannot convert Float64List to DecimalIntList: 
%v", err)
+               // Handle error by falling back to default encoding
+               bb.Buf = encoding.EncodeBytesBlock(bb.Buf[:0], values)
+               bb.Buf = append([]byte{byte(encoding.EncodeTypePlain)}, 
bb.Buf...)
+               // Apply zstd compression for plain encoding
+               compressed := zstd.Compress(nil, bb.Buf, 
defaultCompressionLevel)
+               return compressed, nil
        }
+
        var firstValue int64
        bb.Buf, encodeType, firstValue = encoding.Int64ListToBytes(bb.Buf[:0], 
intValues)
        if encodeType == encoding.EncodeTypeUnknown {
@@ -227,80 +217,37 @@ func (t *tag) encodeFloat64Tag(bb *bytes.Buffer) {
        }
        firstValueBytes := convert.Int64ToBytes(firstValue)
        expBytes := convert.Int16ToBytes(exp)
+
        // Prepend encodeType (1 byte), exp (2 bytes) and firstValue (8 bytes) 
to the beginning
-       bb.Buf = append(
+       result := append(
                append(append([]byte{byte(encodeType)}, expBytes...), 
firstValueBytes...),
                bb.Buf...,
        )
+       return result, nil
 }
 
-func (t *tag) encodeDefault(bb *bytes.Buffer) {
+func encodeDefaultTagValues(bb *bytes.Buffer, values [][]byte) ([]byte, error) 
{
        dict := generateDictionary()
        defer releaseDictionary(dict)
-       for _, v := range t.values {
+
+       for _, v := range values {
                if !dict.Add(v) {
-                       bb.Buf = encoding.EncodeBytesBlock(bb.Buf[:0], t.values)
+                       // Dictionary encoding failed, use plain encoding with 
zstd compression
+                       bb.Buf = encoding.EncodeBytesBlock(bb.Buf[:0], values)
                        bb.Buf = append([]byte{byte(encoding.EncodeTypePlain)}, 
bb.Buf...)
-                       return
+                       // Apply zstd compression for plain encoding
+                       compressed := zstd.Compress(nil, bb.Buf, 
defaultCompressionLevel)
+                       return compressed, nil
                }
        }
+
+       // Dictionary encoding succeeded
        bb.Buf = dict.Encode(bb.Buf[:0])
        bb.Buf = append([]byte{byte(encoding.EncodeTypeDictionary)}, bb.Buf...)
+       return append([]byte(nil), bb.Buf...), nil
 }
 
-func (t *tag) mustReadValues(decoder *encoding.BytesBlockDecoder, reader 
fs.Reader, cm tagMetadata, count uint64) {
-       t.name = cm.name
-       t.valueType = cm.valueType
-       if t.valueType == pbv1.ValueTypeUnknown {
-               for i := uint64(0); i < count; i++ {
-                       t.values = append(t.values, nil)
-               }
-               return
-       }
-
-       bb := bigValuePool.Generate()
-       defer bigValuePool.Release(bb)
-       valuesSize := cm.size
-       if valuesSize > maxValuesBlockSize {
-               logger.Panicf("%s: block size cannot exceed %d bytes; got %d 
bytes", reader.Path(), maxValuesBlockSize, valuesSize)
-       }
-       bb.Buf = bytes.ResizeOver(bb.Buf, int(valuesSize))
-       fs.MustReadData(reader, int64(cm.offset), bb.Buf)
-       t.decodeTagValues(decoder, reader.Path(), count, bb)
-}
-
-func (t *tag) mustSeqReadValues(decoder *encoding.BytesBlockDecoder, reader 
*seqReader, cm tagMetadata, count uint64) {
-       t.name = cm.name
-       t.valueType = cm.valueType
-       if cm.offset != reader.bytesRead {
-               logger.Panicf("%s: offset mismatch: %d vs %d", reader.Path(), 
cm.offset, reader.bytesRead)
-       }
-       valuesSize := cm.size
-       if valuesSize > maxValuesBlockSize {
-               logger.Panicf("%s: block size cannot exceed %d bytes; got %d 
bytes", reader.Path(), maxValuesBlockSize, valuesSize)
-       }
-
-       bb := bigValuePool.Generate()
-       defer bigValuePool.Release(bb)
-
-       bb.Buf = bytes.ResizeOver(bb.Buf, int(valuesSize))
-       reader.mustReadFull(bb.Buf)
-       t.decodeTagValues(decoder, reader.Path(), count, bb)
-}
-
-func (t *tag) decodeTagValues(decoder *encoding.BytesBlockDecoder, path 
string, count uint64, bb *bytes.Buffer) {
-       switch t.valueType {
-       case pbv1.ValueTypeInt64:
-               t.decodeInt64Tag(decoder, path, count, bb)
-       case pbv1.ValueTypeFloat64:
-               t.decodeFloat64Tag(decoder, path, count, bb)
-       default:
-               t.decodeDefault(decoder, bb, count, path)
-       }
-}
-
-func (t *tag) decodeInt64Tag(decoder *encoding.BytesBlockDecoder, path string, 
count uint64, bb *bytes.Buffer) {
-       // decode integer type
+func decodeInt64TagValues(decoder *encoding.BytesBlockDecoder, bb 
*bytes.Buffer, count uint64) ([][]byte, error) {
        intValuesPtr := generateInt64Slice(int(count))
        intValues := *intValuesPtr
        defer releaseInt64Slice(intValuesPtr)
@@ -308,11 +255,27 @@ func (t *tag) decodeInt64Tag(decoder 
*encoding.BytesBlockDecoder, path string, c
        if len(bb.Buf) < 1 {
                logger.Panicf("bb.Buf length too short: expect at least %d 
bytes, but got %d bytes", 1, len(bb.Buf))
        }
+
+       // Check if this is zstd compressed data (no encode type prefix)
+       decompressed, err := zstd.Decompress(nil, bb.Buf)
+       if err == nil {
+               // Successfully decompressed, this is compressed data
+               bb.Buf = decompressed
+               if len(bb.Buf) < 1 {
+                       logger.Panicf("decompressed data too short: expect at 
least %d bytes, but got %d bytes", 1, len(bb.Buf))
+               }
+       }
+
        encodeType := encoding.EncodeType(bb.Buf[0])
        if encodeType == encoding.EncodeTypePlain {
                bb.Buf = bb.Buf[1:]
-               t.decodeDefault(decoder, bb, count, path)
-               return
+               // Data is already decompressed, decode directly without trying 
to decompress again
+               values := make([][]byte, 0, count)
+               values, decodeErr := decoder.Decode(values[:0], bb.Buf, count)
+               if decodeErr != nil {
+                       logger.Panicf("cannot decode values: %v", decodeErr)
+               }
+               return values, nil
        }
 
        const expectedLen = 9
@@ -321,23 +284,25 @@ func (t *tag) decodeInt64Tag(decoder 
*encoding.BytesBlockDecoder, path string, c
        }
        firstValue := convert.BytesToInt64(bb.Buf[1:9])
        bb.Buf = bb.Buf[9:]
-       var err error
+
        intValues, err = encoding.BytesToInt64List(intValues[:0], bb.Buf, 
encodeType, firstValue, int(count))
        if err != nil {
-               logger.Panicf("%s: cannot decode int values: %v", path, err)
+               logger.Panicf("cannot decode int values: %v", err)
        }
-       // convert int64 array to byte array
-       t.values = make([][]byte, count)
+
+       // Convert int64 array to byte array
+       values := make([][]byte, count)
        for i, v := range intValues {
-               t.values[i] = convert.Int64ToBytes(v)
+               values[i] = convert.Int64ToBytes(v)
        }
+       return values, nil
 }
 
-func (t *tag) decodeFloat64Tag(decoder *encoding.BytesBlockDecoder, path 
string, count uint64, bb *bytes.Buffer) {
-       // decode float type
+func decodeFloat64TagValues(decoder *encoding.BytesBlockDecoder, bb 
*bytes.Buffer, count uint64) ([][]byte, error) {
        intValuesPtr := generateInt64Slice(int(count))
        intValues := *intValuesPtr
        defer releaseInt64Slice(intValuesPtr)
+
        floatValuesPtr := generateFloat64Slice(int(count))
        floatValues := *floatValuesPtr
        defer releaseFloat64Slice(floatValuesPtr)
@@ -345,11 +310,27 @@ func (t *tag) decodeFloat64Tag(decoder 
*encoding.BytesBlockDecoder, path string,
        if len(bb.Buf) < 1 {
                logger.Panicf("bb.Buf length too short: expect at least %d 
bytes, but got %d bytes", 1, len(bb.Buf))
        }
+
+       // Check if this is zstd compressed data (no encode type prefix)
+       decompressed, err := zstd.Decompress(nil, bb.Buf)
+       if err == nil {
+               // Successfully decompressed, this is compressed data
+               bb.Buf = decompressed
+               if len(bb.Buf) < 1 {
+                       logger.Panicf("decompressed data too short: expect at 
least %d bytes, but got %d bytes", 1, len(bb.Buf))
+               }
+       }
+
        encodeType := encoding.EncodeType(bb.Buf[0])
        if encodeType == encoding.EncodeTypePlain {
                bb.Buf = bb.Buf[1:]
-               t.decodeDefault(decoder, bb, count, path)
-               return
+               // Data is already decompressed, decode directly without trying 
to decompress again
+               values := make([][]byte, 0, count)
+               values, decodeErr := decoder.Decode(values[:0], bb.Buf, count)
+               if decodeErr != nil {
+                       logger.Panicf("cannot decode values: %v", decodeErr)
+               }
+               return values, nil
        }
 
        const expectedLen = 11
@@ -359,63 +340,60 @@ func (t *tag) decodeFloat64Tag(decoder 
*encoding.BytesBlockDecoder, path string,
        exp := convert.BytesToInt16(bb.Buf[1:3])
        firstValue := convert.BytesToInt64(bb.Buf[3:11])
        bb.Buf = bb.Buf[11:]
-       var err error
+
        intValues, err = encoding.BytesToInt64List(intValues[:0], bb.Buf, 
encodeType, firstValue, int(count))
        if err != nil {
-               logger.Panicf("%s: cannot decode int values: %v", path, err)
+               logger.Panicf("cannot decode int values: %v", err)
        }
+
        floatValues, err = 
encoding.DecimalIntListToFloat64List(floatValues[:0], intValues, exp, 
int(count))
        if err != nil {
                logger.Panicf("cannot convert DecimalIntList to Float64List: 
%v", err)
        }
+
        if uint64(len(floatValues)) != count {
                logger.Panicf("unexpected floatValues length: got %d, expected 
%d", len(floatValues), count)
        }
-       // convert float64 array to byte array
-       t.values = make([][]byte, count)
+
+       // Convert float64 array to byte array
+       values := make([][]byte, count)
        for i, v := range floatValues {
-               t.values[i] = convert.Float64ToBytes(v)
+               values[i] = convert.Float64ToBytes(v)
        }
+       return values, nil
 }
 
-func (t *tag) decodeDefault(decoder *encoding.BytesBlockDecoder, bb 
*bytes.Buffer, count uint64, path string) {
-       encodeType := encoding.EncodeType(bb.Buf[0])
-       var err error
-       if encodeType == encoding.EncodeTypeDictionary {
-               dict := generateDictionary()
-               defer releaseDictionary(dict)
-               t.values, err = dict.Decode(t.values[:0], bb.Buf[1:], count)
-       } else {
-               t.values, err = decoder.Decode(t.values[:0], bb.Buf[1:], count)
-       }
-       if err != nil {
-               logger.Panicf("%s: cannot decode values: %v", path, err)
+func decodeDefaultTagValues(decoder *encoding.BytesBlockDecoder, bb 
*bytes.Buffer, count uint64, values [][]byte) ([][]byte, error) {
+       if len(bb.Buf) < 1 {
+               return values, nil
        }
-}
-
-var bigValuePool = bytes.NewBufferPool("stream-big-value")
 
-type tagFamily struct {
-       name string
-       tags []tag
-}
+       // Check if this is zstd compressed data (no encode type prefix)
+       decompressed, decompErr := zstd.Decompress(nil, bb.Buf)
+       if decompErr == nil {
+               // Successfully decompressed, this is compressed data
+               bb.Buf = decompressed
+               if len(bb.Buf) < 1 {
+                       return values, nil
+               }
+       }
 
-func (tf *tagFamily) reset() {
-       tf.name = ""
+       encodeType := encoding.EncodeType(bb.Buf[0])
+       var err error
 
-       tags := tf.tags
-       for i := range tags {
-               tags[i].reset()
+       switch encodeType {
+       case encoding.EncodeTypeDictionary:
+               dict := generateDictionary()
+               defer releaseDictionary(dict)
+               values, err = dict.Decode(values[:0], bb.Buf[1:], count)
+       case encoding.EncodeTypePlain:
+               values, err = decoder.Decode(values[:0], bb.Buf[1:], count)
+       default:
+               values, err = decoder.Decode(values[:0], bb.Buf[1:], count)
        }
-       tf.tags = tags[:0]
-}
 
-func (tf *tagFamily) resizeTags(tagsLen int) []tag {
-       tags := tf.tags
-       if n := tagsLen - cap(tags); n > 0 {
-               tags = append(tags[:cap(tags)], make([]tag, n)...)
+       if err != nil {
+               logger.Panicf("cannot decode values: %v", err)
        }
-       tags = tags[:tagsLen]
-       tf.tags = tags
-       return tags
+       return values, nil
 }
diff --git a/banyand/internal/sidx/TODO.md b/banyand/internal/sidx/TODO.md
index 70999412..6b59aee4 100644
--- a/banyand/internal/sidx/TODO.md
+++ b/banyand/internal/sidx/TODO.md
@@ -4,7 +4,7 @@ This document tracks the implementation progress of the 
Secondary Index File Sys
 
 ## Implementation Progress Overview
 
-- [x] **Phase 1**: Core Data Structures (6 tasks) - 1/6 completed
+- [x] **Phase 1**: Core Data Structures (6 tasks) - 2/6 completed
 - [ ] **Phase 2**: Memory Management (4 tasks) 
 - [ ] **Phase 3**: Snapshot Management (4 tasks)
 - [ ] **Phase 4**: Write Path (4 tasks)
@@ -31,15 +31,15 @@ This document tracks the implementation progress of the 
Secondary Index File Sys
   - [x] Memory reuse reduces allocations
   - [x] Size calculation accuracy
 
-### 1.2 Tag Structure (`tag.go`)
-- [ ] Individual tag handling (not tag families like stream module)
-- [ ] Support for tag data, metadata, filter files
-- [ ] Implement tag value marshaling/unmarshaling
-- [ ] **Test Cases**:
-  - [ ] Tag encoding/decoding with various value types
-  - [ ] Value type handling (int64, string, bytes)
-  - [ ] Filter generation for indexed tags
-  - [ ] Tag serialization round-trip integrity
+### 1.2 Tag Structure (`tag.go`) ✅
+- [x] Individual tag handling (not tag families like stream module)
+- [x] Support for tag data, metadata, filter files
+- [x] Implement tag value marshaling/unmarshaling
+- [x] **Test Cases**:
+  - [x] Tag encoding/decoding with various value types
+  - [x] Value type handling (int64, string, bytes)
+  - [x] Filter generation for indexed tags
+  - [x] Tag serialization round-trip integrity
 
 ### 1.3 PartWrapper with Reference Counting (`part_wrapper.go`)
 - [ ] Atomic reference counting for safe concurrent access
@@ -76,11 +76,16 @@ This document tracks the implementation progress of the 
Secondary Index File Sys
 - [ ] Contains userKeys[], seriesIDs[], data[], tags[]
 - [ ] Methods: reset(), mustInitFromElements(), validation
 - [ ] Sorting validation for elements within blocks
+- [ ] **Tag encoding/decoding**: Uses shared encoding module from 
`banyand/internal/encoding/tag_encoder.go`
+  - [ ] Implement `block.marshal()` and `block.unmarshal()` methods
+  - [ ] Use `EncodeTagValues()` and `DecodeTagValues()` for tag serialization
+  - [ ] Apply sophisticated encoding: delta for int64, dictionary for strings, 
zstd for plain
 - [ ] **Test Cases**:
   - [ ] Block initialization from sorted elements
   - [ ] Element organization by seriesID and userKey
   - [ ] Key ordering validation within blocks
   - [ ] Block reset and reuse functionality
+  - [ ] Tag encoding/decoding with various value types
 
 ---
 
diff --git a/banyand/internal/sidx/element.go b/banyand/internal/sidx/element.go
index ffacf2f0..289bab34 100644
--- a/banyand/internal/sidx/element.go
+++ b/banyand/internal/sidx/element.go
@@ -15,6 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
+// Package sidx provides secondary index functionality for BanyanDB, including
+// element management, pooling, and sorting capabilities for efficient data 
storage
+// and retrieval operations.
 package sidx
 
 import (
diff --git a/banyand/internal/sidx/element_test.go 
b/banyand/internal/sidx/element_test.go
index 781198b2..4f766b32 100644
--- a/banyand/internal/sidx/element_test.go
+++ b/banyand/internal/sidx/element_test.go
@@ -349,28 +349,28 @@ func TestElementsSorting(t *testing.T) {
 }
 
 func TestNilSafety(t *testing.T) {
-       t.Run("release nil element", func(t *testing.T) {
+       t.Run("release nil element", func(_ *testing.T) {
                // Should not panic
                releaseElement(nil)
        })
 
-       t.Run("release nil elements", func(t *testing.T) {
+       t.Run("release nil elements", func(_ *testing.T) {
                // Should not panic
                releaseElements(nil)
        })
 
-       t.Run("release nil tag", func(t *testing.T) {
+       t.Run("release nil tag", func(_ *testing.T) {
                // Should not panic
                releaseTag(nil)
        })
 
-       t.Run("release non-pooled element", func(t *testing.T) {
+       t.Run("release non-pooled element", func(_ *testing.T) {
                e := &element{pooled: false}
                // Should not panic or add to pool
                releaseElement(e)
        })
 
-       t.Run("release non-pooled elements", func(t *testing.T) {
+       t.Run("release non-pooled elements", func(_ *testing.T) {
                es := &elements{pooled: false}
                // Should not panic or add to pool
                releaseElements(es)
diff --git a/banyand/internal/sidx/tag.go b/banyand/internal/sidx/tag.go
new file mode 100644
index 00000000..38749831
--- /dev/null
+++ b/banyand/internal/sidx/tag.go
@@ -0,0 +1,454 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sidx
+
+import (
+       "bytes"
+       "encoding/binary"
+       "fmt"
+       "io"
+
+       "github.com/apache/skywalking-banyandb/banyand/internal/encoding"
+       "github.com/apache/skywalking-banyandb/pkg/compress/zstd"
+       pkgencoding "github.com/apache/skywalking-banyandb/pkg/encoding"
+       "github.com/apache/skywalking-banyandb/pkg/filter"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
+)
+
+// dataBlock represents a reference to data in a file.
+type dataBlock struct {
+       offset uint64
+       size   uint64
+}
+
+// tagMetadata contains persistent metadata for a tag.
+type tagMetadata struct {
+       name        string
+       min         []byte    // For int64 tags
+       max         []byte    // For int64 tags
+       dataBlock   dataBlock // Offset/size in .td file
+       filterBlock dataBlock // Offset/size in .tf file
+       valueType   pbv1.ValueType
+       indexed     bool
+       compressed  bool
+}
+
+// tagData represents the runtime data for a tag with filtering capabilities.
+type tagData struct {
+       values    [][]byte
+       filter    *filter.BloomFilter // For indexed tags
+       name      string
+       min       []byte // For int64 tags
+       max       []byte // For int64 tags
+       valueType pbv1.ValueType
+       indexed   bool
+}
+
+var (
+       tagDataPool     = pool.Register[*tagData]("sidx-tagData")
+       tagMetadataPool = pool.Register[*tagMetadata]("sidx-tagMetadata")
+       bloomFilterPool = pool.Register[*filter.BloomFilter]("sidx-bloomFilter")
+)
+
+// generateTagData gets a tagData from pool or creates new.
+func generateTagData() *tagData {
+       v := tagDataPool.Get()
+       if v == nil {
+               return &tagData{}
+       }
+       return v
+}
+
+// releaseTagData returns tagData to pool after reset.
+func releaseTagData(td *tagData) {
+       if td == nil {
+               return
+       }
+       td.reset()
+       tagDataPool.Put(td)
+}
+
+// generateTagMetadata gets a tagMetadata from pool or creates new.
+func generateTagMetadata() *tagMetadata {
+       v := tagMetadataPool.Get()
+       if v == nil {
+               return &tagMetadata{}
+       }
+       return v
+}
+
+// releaseTagMetadata returns tagMetadata to pool after reset.
+func releaseTagMetadata(tm *tagMetadata) {
+       if tm == nil {
+               return
+       }
+       tm.reset()
+       tagMetadataPool.Put(tm)
+}
+
+// reset clears tagData for reuse in object pool.
+func (td *tagData) reset() {
+       td.name = ""
+       td.valueType = pbv1.ValueTypeUnknown
+       td.indexed = false
+
+       // Reset values slice
+       for i := range td.values {
+               td.values[i] = nil
+       }
+       td.values = td.values[:0]
+
+       // Reset filter
+       if td.filter != nil {
+               releaseBloomFilter(td.filter)
+               td.filter = nil
+       }
+
+       // Reset min/max
+       td.min = nil
+       td.max = nil
+}
+
+// reset clears tagMetadata for reuse in object pool.
+func (tm *tagMetadata) reset() {
+       tm.name = ""
+       tm.valueType = pbv1.ValueTypeUnknown
+       tm.indexed = false
+       tm.compressed = false
+       tm.dataBlock = dataBlock{}
+       tm.filterBlock = dataBlock{}
+       tm.min = nil
+       tm.max = nil
+}
+
+const (
+       // defaultCompressionLevel for zstd compression.
+       defaultCompressionLevel = 3
+)
+
+// compressTagData compresses tag data using zstd compression.
+func compressTagData(data []byte) []byte {
+       if len(data) == 0 {
+               return nil
+       }
+       return zstd.Compress(nil, data, defaultCompressionLevel)
+}
+
+// decompressTagData decompresses tag data using zstd decompression.
+func decompressTagData(compressedData []byte) ([]byte, error) {
+       if len(compressedData) == 0 {
+               return nil, nil
+       }
+       return zstd.Decompress(nil, compressedData)
+}
+
+// generateBloomFilter gets a bloom filter from pool or creates new.
+func generateBloomFilter(expectedElements int) *filter.BloomFilter {
+       v := bloomFilterPool.Get()
+       if v == nil {
+               return filter.NewBloomFilter(expectedElements)
+       }
+       // Reset and resize for new expected elements
+       v.SetN(expectedElements)
+       m := expectedElements * filter.B
+       v.ResizeBits((m + 63) / 64)
+       return v
+}
+
+// releaseBloomFilter returns bloom filter to pool after reset.
+func releaseBloomFilter(bf *filter.BloomFilter) {
+       if bf == nil {
+               return
+       }
+       bf.Reset()
+       bloomFilterPool.Put(bf)
+}
+
+// encodeBloomFilter encodes a bloom filter to bytes.
+func encodeBloomFilter(dst []byte, bf *filter.BloomFilter) []byte {
+       if bf == nil {
+               return dst
+       }
+       dst = pkgencoding.Int64ToBytes(dst, int64(bf.N()))
+       dst = pkgencoding.EncodeUint64Block(dst, bf.Bits())
+       return dst
+}
+
+// decodeBloomFilter decodes bytes to bloom filter.
+func decodeBloomFilter(src []byte) (*filter.BloomFilter, error) {
+       if len(src) < 8 {
+               return nil, fmt.Errorf("invalid bloom filter data: too short")
+       }
+
+       n := pkgencoding.BytesToInt64(src)
+       bf := generateBloomFilter(int(n))
+
+       m := n * filter.B
+       bits := make([]uint64, 0, (m+63)/64)
+       var err error
+       bits, _, err = pkgencoding.DecodeUint64Block(bits, src[8:], 
uint64((m+63)/64))
+       if err != nil {
+               releaseBloomFilter(bf)
+               return nil, fmt.Errorf("failed to decode bloom filter bits: 
%w", err)
+       }
+       bf.SetBits(bits)
+
+       return bf, nil
+}
+
+// generateTagFilter creates a bloom filter for indexed tags.
+func generateTagFilter(values [][]byte, expectedElements int) 
*filter.BloomFilter {
+       if len(values) == 0 {
+               return nil
+       }
+
+       bloomFilter := generateBloomFilter(expectedElements)
+
+       for _, value := range values {
+               bloomFilter.Add(value)
+       }
+
+       return bloomFilter
+}
+
+// EncodeTagValues encodes tag values using the shared encoding module.
+func EncodeTagValues(values [][]byte, valueType pbv1.ValueType) ([]byte, 
error) {
+       return encoding.EncodeTagValues(values, valueType)
+}
+
+// DecodeTagValues decodes tag values using the shared encoding module.
+func DecodeTagValues(data []byte, valueType pbv1.ValueType, count int) 
([][]byte, error) {
+       return encoding.DecodeTagValues(data, valueType, count)
+}
+
+// updateMinMax updates min/max values for int64 tags.
+func (td *tagData) updateMinMax() {
+       if td.valueType != pbv1.ValueTypeInt64 || len(td.values) == 0 {
+               return
+       }
+
+       var minVal, maxVal int64
+       first := true
+
+       for _, value := range td.values {
+               if len(value) != 8 {
+                       continue // Skip invalid int64 values
+               }
+
+               val := pkgencoding.BytesToInt64(value)
+
+               if first {
+                       minVal = val
+                       maxVal = val
+                       first = false
+               } else {
+                       if val < minVal {
+                               minVal = val
+                       }
+                       if val > maxVal {
+                               maxVal = val
+                       }
+               }
+       }
+
+       if !first {
+               td.min = pkgencoding.Int64ToBytes(nil, minVal)
+               td.max = pkgencoding.Int64ToBytes(nil, maxVal)
+       }
+}
+
+// addValue adds a value to the tag data.
+func (td *tagData) addValue(value []byte) {
+       td.values = append(td.values, value)
+
+       // Update filter for indexed tags
+       if td.indexed && td.filter != nil {
+               td.filter.Add(value)
+       }
+}
+
+// hasValue checks if a value exists in the tag using the bloom filter.
+func (td *tagData) hasValue(value []byte) bool {
+       if !td.indexed || td.filter == nil {
+               // For non-indexed tags, do linear search
+               for _, v := range td.values {
+                       if bytes.Equal(v, value) {
+                               return true
+                       }
+               }
+               return false
+       }
+
+       return td.filter.MightContain(value)
+}
+
+// marshalTagMetadata serializes tag metadata to bytes.
+func (tm *tagMetadata) marshal() ([]byte, error) {
+       buf := &bytes.Buffer{}
+
+       // Write name length and name
+       nameBytes := []byte(tm.name)
+       if err := binary.Write(buf, binary.LittleEndian, 
uint32(len(nameBytes))); err != nil {
+               return nil, err
+       }
+       if _, err := buf.Write(nameBytes); err != nil {
+               return nil, err
+       }
+
+       // Write value type
+       if err := binary.Write(buf, binary.LittleEndian, uint32(tm.valueType)); 
err != nil {
+               return nil, err
+       }
+
+       // Write data block
+       if err := binary.Write(buf, binary.LittleEndian, tm.dataBlock.offset); 
err != nil {
+               return nil, err
+       }
+       if err := binary.Write(buf, binary.LittleEndian, tm.dataBlock.size); 
err != nil {
+               return nil, err
+       }
+
+       // Write filter block
+       if err := binary.Write(buf, binary.LittleEndian, 
tm.filterBlock.offset); err != nil {
+               return nil, err
+       }
+       if err := binary.Write(buf, binary.LittleEndian, tm.filterBlock.size); 
err != nil {
+               return nil, err
+       }
+
+       // Write flags
+       var flags uint8
+       if tm.indexed {
+               flags |= 1
+       }
+       if tm.compressed {
+               flags |= 2
+       }
+       if err := binary.Write(buf, binary.LittleEndian, flags); err != nil {
+               return nil, err
+       }
+
+       // Write min length and min
+       if err := binary.Write(buf, binary.LittleEndian, uint32(len(tm.min))); 
err != nil {
+               return nil, err
+       }
+       if len(tm.min) > 0 {
+               if _, err := buf.Write(tm.min); err != nil {
+                       return nil, err
+               }
+       }
+
+       // Write max length and max
+       if err := binary.Write(buf, binary.LittleEndian, uint32(len(tm.max))); 
err != nil {
+               return nil, err
+       }
+       if len(tm.max) > 0 {
+               if _, err := buf.Write(tm.max); err != nil {
+                       return nil, err
+               }
+       }
+
+       return buf.Bytes(), nil
+}
+
+// unmarshalTagMetadata deserializes tag metadata from bytes.
+func unmarshalTagMetadata(data []byte) (*tagMetadata, error) {
+       tm := generateTagMetadata()
+       buf := bytes.NewReader(data)
+
+       // Read name
+       var nameLen uint32
+       if err := binary.Read(buf, binary.LittleEndian, &nameLen); err != nil {
+               releaseTagMetadata(tm)
+               return nil, err
+       }
+       nameBytes := make([]byte, nameLen)
+       if _, err := io.ReadFull(buf, nameBytes); err != nil {
+               releaseTagMetadata(tm)
+               return nil, err
+       }
+       tm.name = string(nameBytes)
+
+       // Read value type
+       var valueType uint32
+       if err := binary.Read(buf, binary.LittleEndian, &valueType); err != nil 
{
+               releaseTagMetadata(tm)
+               return nil, err
+       }
+       tm.valueType = pbv1.ValueType(valueType)
+
+       // Read data block
+       if err := binary.Read(buf, binary.LittleEndian, &tm.dataBlock.offset); 
err != nil {
+               releaseTagMetadata(tm)
+               return nil, err
+       }
+       if err := binary.Read(buf, binary.LittleEndian, &tm.dataBlock.size); 
err != nil {
+               releaseTagMetadata(tm)
+               return nil, err
+       }
+
+       // Read filter block
+       if err := binary.Read(buf, binary.LittleEndian, 
&tm.filterBlock.offset); err != nil {
+               releaseTagMetadata(tm)
+               return nil, err
+       }
+       if err := binary.Read(buf, binary.LittleEndian, &tm.filterBlock.size); 
err != nil {
+               releaseTagMetadata(tm)
+               return nil, err
+       }
+
+       // Read flags
+       var flags uint8
+       if err := binary.Read(buf, binary.LittleEndian, &flags); err != nil {
+               releaseTagMetadata(tm)
+               return nil, err
+       }
+       tm.indexed = (flags & 1) != 0
+       tm.compressed = (flags & 2) != 0
+
+       // Read min
+       var minLen uint32
+       if err := binary.Read(buf, binary.LittleEndian, &minLen); err != nil {
+               releaseTagMetadata(tm)
+               return nil, err
+       }
+       if minLen > 0 {
+               tm.min = make([]byte, minLen)
+               if _, err := io.ReadFull(buf, tm.min); err != nil {
+                       releaseTagMetadata(tm)
+                       return nil, err
+               }
+       }
+
+       // Read max
+       var maxLen uint32
+       if err := binary.Read(buf, binary.LittleEndian, &maxLen); err != nil {
+               releaseTagMetadata(tm)
+               return nil, err
+       }
+       if maxLen > 0 {
+               tm.max = make([]byte, maxLen)
+               if _, err := io.ReadFull(buf, tm.max); err != nil {
+                       releaseTagMetadata(tm)
+                       return nil, err
+               }
+       }
+
+       return tm, nil
+}
diff --git a/banyand/internal/sidx/tag_test.go 
b/banyand/internal/sidx/tag_test.go
new file mode 100644
index 00000000..5e51dfea
--- /dev/null
+++ b/banyand/internal/sidx/tag_test.go
@@ -0,0 +1,724 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sidx
+
+import (
+       "bytes"
+       "encoding/binary"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+)
+
+const testTagName = "test_tag"
+
+func TestTagValueMarshaling(t *testing.T) {
+       tests := []struct {
+               name   string
+               values [][]byte
+               want   bool // whether marshaling should succeed
+       }{
+               {
+                       name:   "empty values",
+                       values: [][]byte{},
+                       want:   true,
+               },
+               {
+                       name:   "single value",
+                       values: [][]byte{[]byte("test")},
+                       want:   true,
+               },
+               {
+                       name: "multiple values",
+                       values: [][]byte{
+                               []byte("value1"),
+                               []byte("value2"),
+                               []byte("value3"),
+                       },
+                       want: true,
+               },
+               {
+                       name: "values with different lengths",
+                       values: [][]byte{
+                               []byte("a"),
+                               []byte("longer_value"),
+                               []byte(""),
+                               []byte("medium"),
+                       },
+                       want: true,
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       // Marshal values using shared encoding (default to 
string type for basic marshaling test)
+                       data, err := EncodeTagValues(tt.values, 
pbv1.ValueTypeStr)
+                       if tt.want {
+                               require.NoError(t, err)
+                       } else {
+                               require.Error(t, err)
+                               return
+                       }
+
+                       // Unmarshal and verify
+                       unmarshaled, err := DecodeTagValues(data, 
pbv1.ValueTypeStr, len(tt.values))
+                       require.NoError(t, err)
+                       assert.Equal(t, len(tt.values), len(unmarshaled))
+
+                       for i, expected := range tt.values {
+                               switch {
+                               case expected == nil:
+                                       assert.Nil(t, unmarshaled[i])
+                               case len(expected) == 0:
+                                       // Handle empty byte slices - encoding 
may return nil for empty values
+                                       assert.True(t, len(unmarshaled[i]) == 
0, "Expected empty value at index %d", i)
+                               default:
+                                       assert.Equal(t, expected, 
unmarshaled[i])
+                               }
+                       }
+               })
+       }
+}
+
+func TestTagValueEncoding(t *testing.T) {
+       tests := []struct {
+               name      string
+               values    [][]byte
+               valueType pbv1.ValueType
+               wantErr   bool
+       }{
+               {
+                       name:      "int64 values",
+                       values:    [][]byte{{0x39, 0x30, 0x00, 0x00, 0x00, 
0x00, 0x00, 0x00}}, // int64(12345) encoded
+                       valueType: pbv1.ValueTypeInt64,
+                       wantErr:   false,
+               },
+               {
+                       name:      "string values",
+                       values:    [][]byte{[]byte("test string"), 
[]byte("another string")},
+                       valueType: pbv1.ValueTypeStr,
+                       wantErr:   false,
+               },
+               {
+                       name:      "binary data",
+                       values:    [][]byte{{0x01, 0x02, 0x03, 0x04}, {0xFF, 
0xFE}},
+                       valueType: pbv1.ValueTypeBinaryData,
+                       wantErr:   false,
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       encoded, err := EncodeTagValues(tt.values, tt.valueType)
+                       if tt.wantErr {
+                               assert.Error(t, err)
+                               return
+                       }
+
+                       require.NoError(t, err)
+                       assert.NotNil(t, encoded)
+
+                       // Decode and verify round-trip
+                       decoded, err := DecodeTagValues(encoded, tt.valueType, 
len(tt.values))
+                       require.NoError(t, err)
+                       assert.Equal(t, len(tt.values), len(decoded))
+                       for i, expected := range tt.values {
+                               assert.Equal(t, expected, decoded[i])
+                       }
+               })
+       }
+}
+
+func TestTagFilterGeneration(t *testing.T) {
+       tests := []struct {
+               name             string
+               values           [][]byte
+               expectedElements int
+               wantNil          bool
+       }{
+               {
+                       name:             "empty values",
+                       values:           [][]byte{},
+                       expectedElements: 10,
+                       wantNil:          true,
+               },
+               {
+                       name: "single value",
+                       values: [][]byte{
+                               []byte("value1"),
+                       },
+                       expectedElements: 10,
+                       wantNil:          false,
+               },
+               {
+                       name: "multiple values",
+                       values: [][]byte{
+                               []byte("value1"),
+                               []byte("value2"),
+                               []byte("value3"),
+                       },
+                       expectedElements: 10,
+                       wantNil:          false,
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       filter := generateTagFilter(tt.values, 
tt.expectedElements)
+                       if tt.wantNil {
+                               assert.Nil(t, filter)
+                               return
+                       }
+
+                       require.NotNil(t, filter)
+
+                       // Test that all added values might be contained
+                       for _, value := range tt.values {
+                               assert.True(t, filter.MightContain(value))
+                       }
+
+                       // Test that a non-added value might not be contained
+                       // (this could return true due to false positives, but 
we test anyway)
+                       nonExistent := []byte("non_existent_value_12345")
+                       // We can't assert false here due to possible false 
positives
+                       filter.MightContain(nonExistent)
+               })
+       }
+}
+
+func TestTagDataOperations(t *testing.T) {
+       t.Run("add and check values", func(t *testing.T) {
+               td := generateTagData()
+               defer releaseTagData(td)
+
+               td.name = testTagName
+               td.valueType = pbv1.ValueTypeStr
+               td.indexed = true
+               td.filter = generateTagFilter([][]byte{}, 10) // Start with 
empty filter
+
+               // Add values
+               values := [][]byte{
+                       []byte("value1"),
+                       []byte("value2"),
+                       []byte("value3"),
+               }
+
+               for _, value := range values {
+                       td.addValue(value)
+               }
+
+               // Check that all values are present
+               for _, value := range values {
+                       assert.True(t, td.hasValue(value))
+               }
+
+               // Check that a non-existent value might not be present
+               assert.False(t, td.hasValue([]byte("non_existent")))
+       })
+
+       t.Run("update min max for int64", func(t *testing.T) {
+               td := generateTagData()
+               defer releaseTagData(td)
+
+               td.name = "int_tag"
+               td.valueType = pbv1.ValueTypeInt64
+
+               // Add int64 values (encoded as bytes)
+               values := []int64{100, -50, 200, 0, 150}
+               for _, v := range values {
+                       encoded := make([]byte, 8)
+                       binary.LittleEndian.PutUint64(encoded, uint64(v))
+                       td.values = append(td.values, encoded)
+               }
+
+               td.updateMinMax()
+
+               // Verify min and max
+               assert.NotNil(t, td.min)
+               assert.NotNil(t, td.max)
+
+               // Decode and verify values
+               minVal := int64(binary.LittleEndian.Uint64(td.min))
+               assert.Equal(t, int64(-50), minVal)
+
+               maxVal := int64(binary.LittleEndian.Uint64(td.max))
+               assert.Equal(t, int64(200), maxVal)
+       })
+
+       t.Run("reset functionality", func(t *testing.T) {
+               td := generateTagData()
+
+               // Set up tag data
+               td.name = testTagName
+               td.valueType = pbv1.ValueTypeStr
+               td.indexed = true
+               td.values = [][]byte{[]byte("value1"), []byte("value2")}
+               td.filter = generateTagFilter(td.values, 10)
+               td.min = []byte("min")
+               td.max = []byte("max")
+
+               // Reset
+               td.reset()
+
+               // Verify reset
+               assert.Equal(t, "", td.name)
+               assert.Equal(t, pbv1.ValueTypeUnknown, td.valueType)
+               assert.False(t, td.indexed)
+               assert.Equal(t, 0, len(td.values))
+               assert.Nil(t, td.filter)
+               assert.Nil(t, td.min)
+               assert.Nil(t, td.max)
+
+               releaseTagData(td)
+       })
+}
+
+func TestTagMetadataOperations(t *testing.T) {
+       t.Run("marshal and unmarshal", func(t *testing.T) {
+               original := generateTagMetadata()
+               defer releaseTagMetadata(original)
+
+               // Set up metadata
+               original.name = "test_tag"
+               original.valueType = pbv1.ValueTypeInt64
+               original.indexed = true
+               original.compressed = false
+               original.dataBlock = dataBlock{offset: 100, size: 500}
+               original.filterBlock = dataBlock{offset: 600, size: 200}
+               original.min = []byte{0x01, 0x02}
+               original.max = []byte{0xFF, 0xFE}
+
+               // Marshal
+               data, err := original.marshal()
+               require.NoError(t, err)
+               assert.NotNil(t, data)
+
+               // Unmarshal
+               unmarshaled, err := unmarshalTagMetadata(data)
+               require.NoError(t, err)
+               defer releaseTagMetadata(unmarshaled)
+
+               // Verify fields
+               assert.Equal(t, original.name, unmarshaled.name)
+               assert.Equal(t, original.valueType, unmarshaled.valueType)
+               assert.Equal(t, original.indexed, unmarshaled.indexed)
+               assert.Equal(t, original.compressed, unmarshaled.compressed)
+               assert.Equal(t, original.dataBlock, unmarshaled.dataBlock)
+               assert.Equal(t, original.filterBlock, unmarshaled.filterBlock)
+               assert.Equal(t, original.min, unmarshaled.min)
+               assert.Equal(t, original.max, unmarshaled.max)
+       })
+
+       t.Run("marshal empty metadata", func(t *testing.T) {
+               tm := generateTagMetadata()
+               defer releaseTagMetadata(tm)
+
+               tm.name = "empty_tag"
+               tm.valueType = pbv1.ValueTypeStr
+
+               data, err := tm.marshal()
+               require.NoError(t, err)
+
+               unmarshaled, err := unmarshalTagMetadata(data)
+               require.NoError(t, err)
+               defer releaseTagMetadata(unmarshaled)
+
+               assert.Equal(t, tm.name, unmarshaled.name)
+               assert.Equal(t, tm.valueType, unmarshaled.valueType)
+               assert.False(t, unmarshaled.indexed)
+               assert.False(t, unmarshaled.compressed)
+               assert.Nil(t, unmarshaled.min)
+               assert.Nil(t, unmarshaled.max)
+       })
+
+       t.Run("reset functionality", func(t *testing.T) {
+               tm := generateTagMetadata()
+
+               // Set up metadata
+               tm.name = "test_tag"
+               tm.valueType = pbv1.ValueTypeInt64
+               tm.indexed = true
+               tm.compressed = true
+               tm.dataBlock = dataBlock{offset: 100, size: 500}
+               tm.filterBlock = dataBlock{offset: 600, size: 200}
+               tm.min = []byte("min")
+               tm.max = []byte("max")
+
+               // Reset
+               tm.reset()
+
+               // Verify reset
+               assert.Equal(t, "", tm.name)
+               assert.Equal(t, pbv1.ValueTypeUnknown, tm.valueType)
+               assert.False(t, tm.indexed)
+               assert.False(t, tm.compressed)
+               assert.Equal(t, dataBlock{}, tm.dataBlock)
+               assert.Equal(t, dataBlock{}, tm.filterBlock)
+               assert.Nil(t, tm.min)
+               assert.Nil(t, tm.max)
+
+               releaseTagMetadata(tm)
+       })
+}
+
+func TestTagPooling(t *testing.T) {
+       t.Run("tagData pooling", func(t *testing.T) {
+               // Get from pool
+               td1 := generateTagData()
+               assert.NotNil(t, td1)
+
+               // Use and release
+               td1.name = "test"
+               releaseTagData(td1)
+
+               // Get again - should be reset
+               td2 := generateTagData()
+               assert.NotNil(t, td2)
+               assert.Equal(t, "", td2.name) // Should be reset
+
+               releaseTagData(td2)
+       })
+
+       t.Run("tagMetadata pooling", func(t *testing.T) {
+               // Get from pool
+               tm1 := generateTagMetadata()
+               assert.NotNil(t, tm1)
+
+               // Use and release
+               tm1.name = "test"
+               releaseTagMetadata(tm1)
+
+               // Get again - should be reset
+               tm2 := generateTagMetadata()
+               assert.NotNil(t, tm2)
+               assert.Equal(t, "", tm2.name) // Should be reset
+
+               releaseTagMetadata(tm2)
+       })
+
+       t.Run("bloomFilter pooling", func(t *testing.T) {
+               // Generate bloom filter
+               bf1 := generateBloomFilter(100)
+               assert.NotNil(t, bf1)
+               assert.Equal(t, 100, bf1.N())
+
+               // Add some data
+               bf1.Add([]byte("test1"))
+               bf1.Add([]byte("test2"))
+               assert.True(t, bf1.MightContain([]byte("test1")))
+
+               // Release to pool
+               releaseBloomFilter(bf1)
+
+               // Get from pool again
+               bf2 := generateBloomFilter(50)
+               assert.NotNil(t, bf2)
+               assert.Equal(t, 50, bf2.N())                       // Should be 
reset to new size
+               assert.False(t, bf2.MightContain([]byte("test1"))) // Should 
not contain old data
+
+               releaseBloomFilter(bf2)
+       })
+}
+
+func TestEdgeCases(t *testing.T) {
+       t.Run("invalid int64 length", func(t *testing.T) {
+               // The shared encoding module panics on invalid data (fail fast)
+               assert.Panics(t, func() {
+                       invalidValues := [][]byte{{0x01, 0x02, 0x03}} // Only 3 
bytes, not 8
+                       _, _ = EncodeTagValues(invalidValues, 
pbv1.ValueTypeInt64)
+               })
+       })
+
+       t.Run("marshal nil values", func(t *testing.T) {
+               data, err := EncodeTagValues(nil, pbv1.ValueTypeStr)
+               require.NoError(t, err)
+               assert.Nil(t, data)
+
+               values, err := DecodeTagValues(nil, pbv1.ValueTypeStr, 0)
+               require.NoError(t, err)
+               assert.Nil(t, values)
+       })
+
+       t.Run("updateMinMax with empty values", func(t *testing.T) {
+               td := generateTagData()
+               defer releaseTagData(td)
+
+               td.valueType = pbv1.ValueTypeInt64
+               td.values = [][]byte{} // empty
+
+               td.updateMinMax()
+               assert.Nil(t, td.min)
+               assert.Nil(t, td.max)
+       })
+
+       t.Run("updateMinMax with non-int64 type", func(t *testing.T) {
+               td := generateTagData()
+               defer releaseTagData(td)
+
+               td.valueType = pbv1.ValueTypeStr
+               td.values = [][]byte{[]byte("test")}
+
+               td.updateMinMax()
+               assert.Nil(t, td.min)
+               assert.Nil(t, td.max)
+       })
+
+       t.Run("hasValue with non-indexed tag", func(t *testing.T) {
+               td := generateTagData()
+               defer releaseTagData(td)
+
+               td.indexed = false
+               td.values = [][]byte{[]byte("value1"), []byte("value2")}
+
+               // Should use linear search
+               assert.True(t, td.hasValue([]byte("value1")))
+               assert.True(t, td.hasValue([]byte("value2")))
+               assert.False(t, td.hasValue([]byte("value3")))
+       })
+}
+
+func TestRoundTripIntegrity(t *testing.T) {
+       t.Run("complete round trip", func(t *testing.T) {
+               // Create original tag metadata
+               original := generateTagMetadata()
+               defer releaseTagMetadata(original)
+
+               original.name = "integration_tag"
+               original.valueType = pbv1.ValueTypeInt64
+               original.indexed = true
+               original.compressed = true
+               original.dataBlock = dataBlock{offset: 1000, size: 2000}
+               original.filterBlock = dataBlock{offset: 3000, size: 500}
+
+               // Create some int64 values
+               int64Values := []int64{-100, 0, 100, 200, 50}
+               var encodedValues [][]byte
+               for _, v := range int64Values {
+                       encoded := make([]byte, 8)
+                       binary.LittleEndian.PutUint64(encoded, uint64(v))
+                       encodedValues = append(encodedValues, encoded)
+               }
+
+               // Marshal values using shared encoding
+               marshaledValues, err := EncodeTagValues(encodedValues, 
pbv1.ValueTypeInt64)
+               require.NoError(t, err)
+
+               // Marshal metadata
+               marshaledMetadata, err := original.marshal()
+               require.NoError(t, err)
+
+               // Unmarshal metadata
+               unmarshaledMetadata, err := 
unmarshalTagMetadata(marshaledMetadata)
+               require.NoError(t, err)
+               defer releaseTagMetadata(unmarshaledMetadata)
+
+               // Unmarshal values using shared encoding
+               unmarshaledValues, err := DecodeTagValues(marshaledValues, 
pbv1.ValueTypeInt64, len(encodedValues))
+               require.NoError(t, err)
+
+               // Verify metadata integrity
+               assert.Equal(t, original.name, unmarshaledMetadata.name)
+               assert.Equal(t, original.valueType, 
unmarshaledMetadata.valueType)
+               assert.Equal(t, original.indexed, unmarshaledMetadata.indexed)
+               assert.Equal(t, original.compressed, 
unmarshaledMetadata.compressed)
+
+               // Verify values integrity
+               assert.Equal(t, len(encodedValues), len(unmarshaledValues))
+               for i, original := range encodedValues {
+                       assert.True(t, bytes.Equal(original, 
unmarshaledValues[i]))
+               }
+
+               // Decode and verify int64 values
+               for i, expected := range int64Values {
+                       decoded := 
int64(binary.LittleEndian.Uint64(unmarshaledValues[i]))
+                       assert.Equal(t, expected, decoded)
+               }
+       })
+}
+
+func TestTagCompression(t *testing.T) {
+       t.Run("compress and decompress tag data", func(t *testing.T) {
+               originalData := []byte("this is some test data that should be 
compressed and decompressed properly")
+
+               // Compress
+               compressed := compressTagData(originalData)
+               assert.NotNil(t, compressed)
+               assert.NotEqual(t, originalData, compressed)
+
+               // Decompress
+               decompressed, err := decompressTagData(compressed)
+               require.NoError(t, err)
+               assert.Equal(t, originalData, decompressed)
+       })
+
+       t.Run("compress empty data", func(t *testing.T) {
+               compressed := compressTagData(nil)
+               assert.Nil(t, compressed)
+
+               compressed = compressTagData([]byte{})
+               assert.Nil(t, compressed)
+       })
+
+       t.Run("decompress empty data", func(t *testing.T) {
+               decompressed, err := decompressTagData(nil)
+               require.NoError(t, err)
+               assert.Nil(t, decompressed)
+
+               decompressed, err = decompressTagData([]byte{})
+               require.NoError(t, err)
+               assert.Nil(t, decompressed)
+       })
+}
+
+func TestTagValuesCompression(t *testing.T) {
+       t.Run("encode and decode compressed values", func(t *testing.T) {
+               values := [][]byte{
+                       []byte("this is a longer string that should compress 
well"),
+                       []byte("another long string with repeated words 
repeated words"),
+                       []byte("yet another string for compression testing 
purposes"),
+               }
+
+               // Encode with automatic compression for string data
+               compressed, err := EncodeTagValues(values, pbv1.ValueTypeStr)
+               require.NoError(t, err)
+               assert.NotNil(t, compressed)
+
+               // Decode compressed data
+               decompressed, err := DecodeTagValues(compressed, 
pbv1.ValueTypeStr, len(values))
+               require.NoError(t, err)
+               assert.Equal(t, len(values), len(decompressed))
+
+               for i, expected := range values {
+                       assert.Equal(t, expected, decompressed[i])
+               }
+       })
+
+       t.Run("compression works for repetitive data", func(t *testing.T) {
+               // Create repetitive data that should compress well
+               repetitiveData := make([][]byte, 100)
+               for i := range repetitiveData {
+                       repetitiveData[i] = 
[]byte("repeated_data_pattern_that_compresses_well")
+               }
+
+               // Encode with automatic compression
+               compressed, err := EncodeTagValues(repetitiveData, 
pbv1.ValueTypeStr)
+               require.NoError(t, err)
+
+               // Verify decompression works
+               decompressed, err := DecodeTagValues(compressed, 
pbv1.ValueTypeStr, len(repetitiveData))
+               require.NoError(t, err)
+               assert.Equal(t, repetitiveData, decompressed)
+       })
+}
+
+func TestCompressionRoundTrip(t *testing.T) {
+       testCases := []struct {
+               name   string
+               values [][]byte
+       }{
+               {
+                       name:   "small values",
+                       values: [][]byte{[]byte("a"), []byte("b"), []byte("c")},
+               },
+               {
+                       name: "large values",
+                       values: [][]byte{
+                               []byte("this is a very long string that 
contains a lot of data and should compress well when using zstd compression 
algorithm"),
+                               []byte("another long string with different 
content but still should benefit from compression due to common patterns"),
+                       },
+               },
+               {
+                       name: "mixed size values",
+                       values: [][]byte{
+                               []byte("short"),
+                               []byte("this is a medium length string"),
+                               []byte("this is a very very very long string 
that goes on and on with lots of repeated words and patterns that compression 
algorithms love to work with"),
+                               []byte("x"),
+                       },
+               },
+       }
+
+       for _, tc := range testCases {
+               t.Run(tc.name, func(t *testing.T) {
+                       // Test round trip: values -> encoded -> decoded
+                       encoded, err := EncodeTagValues(tc.values, 
pbv1.ValueTypeStr)
+                       require.NoError(t, err)
+
+                       decoded, err := DecodeTagValues(encoded, 
pbv1.ValueTypeStr, len(tc.values))
+                       require.NoError(t, err)
+
+                       assert.Equal(t, tc.values, decoded)
+               })
+       }
+}
+
+func TestBloomFilterEncoding(t *testing.T) {
+       t.Run("encode and decode bloom filter", func(t *testing.T) {
+               // Create a bloom filter and add some data
+               bf := generateBloomFilter(100)
+               defer releaseBloomFilter(bf)
+
+               testValues := [][]byte{
+                       []byte("value1"),
+                       []byte("value2"),
+                       []byte("value3"),
+               }
+
+               for _, value := range testValues {
+                       bf.Add(value)
+               }
+
+               // Encode the bloom filter
+               var encoded []byte
+               encoded = encodeBloomFilter(encoded, bf)
+               assert.NotEmpty(t, encoded)
+
+               // Decode the bloom filter
+               decodedBf, err := decodeBloomFilter(encoded)
+               require.NoError(t, err)
+               defer releaseBloomFilter(decodedBf)
+
+               // Verify the decoded filter contains the same data
+               assert.Equal(t, bf.N(), decodedBf.N())
+               for _, value := range testValues {
+                       assert.True(t, decodedBf.MightContain(value))
+               }
+       })
+
+       t.Run("encode empty bloom filter", func(t *testing.T) {
+               var encoded []byte
+               encoded = encodeBloomFilter(encoded, nil)
+               assert.Empty(t, encoded)
+       })
+
+       t.Run("decode invalid data", func(t *testing.T) {
+               // Test with data too short
+               invalidData := []byte{0x01, 0x02}
+               _, err := decodeBloomFilter(invalidData)
+               assert.Error(t, err)
+               assert.Contains(t, err.Error(), "too short")
+
+               // Test with empty data
+               _, err = decodeBloomFilter([]byte{})
+               assert.Error(t, err)
+               assert.Contains(t, err.Error(), "too short")
+       })
+}
diff --git a/banyand/stream/tag.go b/banyand/stream/tag.go
index 4ad42dd4..f1751313 100644
--- a/banyand/stream/tag.go
+++ b/banyand/stream/tag.go
@@ -18,70 +18,12 @@
 package stream
 
 import (
+       "github.com/apache/skywalking-banyandb/banyand/internal/encoding"
        "github.com/apache/skywalking-banyandb/pkg/bytes"
-       "github.com/apache/skywalking-banyandb/pkg/convert"
-       "github.com/apache/skywalking-banyandb/pkg/encoding"
+       pkgencoding "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
-       "github.com/apache/skywalking-banyandb/pkg/pool"
-)
-
-func generateInt64Slice(length int) *[]int64 {
-       v := int64SlicePool.Get()
-       if v == nil {
-               s := make([]int64, length)
-               return &s
-       }
-       if cap(*v) < length {
-               *v = make([]int64, length)
-       } else {
-               *v = (*v)[:length]
-       }
-       return v
-}
-
-func releaseInt64Slice(int64Slice *[]int64) {
-       *int64Slice = (*int64Slice)[:0]
-       int64SlicePool.Put(int64Slice)
-}
-
-func generateFloat64Slice(length int) *[]float64 {
-       v := float64SlicePool.Get()
-       if v == nil {
-               s := make([]float64, length)
-               return &s
-       }
-       if cap(*v) < length {
-               *v = make([]float64, length)
-       } else {
-               *v = (*v)[:length]
-       }
-       return v
-}
-
-func releaseFloat64Slice(float64Slice *[]float64) {
-       *float64Slice = (*float64Slice)[:0]
-       float64SlicePool.Put(float64Slice)
-}
-
-func generateDictionary() *encoding.Dictionary {
-       v := dictionaryPool.Get()
-       if v == nil {
-               return &encoding.Dictionary{}
-       }
-       return v
-}
-
-func releaseDictionary(d *encoding.Dictionary) {
-       d.Reset()
-       dictionaryPool.Put(d)
-}
-
-var (
-       int64SlicePool   = pool.Register[*[]int64]("stream-int64Slice")
-       float64SlicePool = pool.Register[*[]float64]("stream-float64Slice")
-       dictionaryPool   = 
pool.Register[*encoding.Dictionary]("stream-dictionary")
 )
 
 type tag struct {
@@ -119,27 +61,22 @@ func (t *tag) mustWriteTo(tm *tagMetadata, tagWriter 
*writer, tagFilterWriter *w
        tm.name = t.name
        tm.valueType = t.valueType
 
-       bb := bigValuePool.Generate()
-       defer bigValuePool.Release(bb)
-
-       // select encoding based on data type
-       switch t.valueType {
-       case pbv1.ValueTypeInt64:
-               t.encodeInt64Tag(bb)
-       case pbv1.ValueTypeFloat64:
-               t.encodeFloat64Tag(bb)
-       default:
-               t.encodeDefault(bb)
+       // Use shared encoding module
+       encodedData, err := encoding.EncodeTagValues(t.values, t.valueType)
+       if err != nil {
+               logger.Panicf("failed to encode tag values: %v", err)
        }
-       tm.size = uint64(len(bb.Buf))
+
+       tm.size = uint64(len(encodedData))
        if tm.size > maxValuesBlockSize {
                logger.Panicf("too large valuesSize: %d bytes; mustn't exceed 
%d bytes", tm.size, maxValuesBlockSize)
        }
        tm.offset = tagWriter.bytesWritten
-       tagWriter.MustWrite(bb.Buf)
+       tagWriter.MustWrite(encodedData)
 
        if t.filter != nil {
-               bb.Reset()
+               bb := bigValuePool.Generate()
+               defer bigValuePool.Release(bb)
                bb.Buf = encodeBloomFilter(bb.Buf[:0], t.filter)
                if tm.valueType == pbv1.ValueTypeInt64 {
                        tm.min = t.min
@@ -151,125 +88,33 @@ func (t *tag) mustWriteTo(tm *tagMetadata, tagWriter 
*writer, tagFilterWriter *w
        }
 }
 
-func (t *tag) encodeInt64Tag(bb *bytes.Buffer) {
-       // convert byte array to int64 array
-       intValuesPtr := generateInt64Slice(len(t.values))
-       intValues := *intValuesPtr
-       defer releaseInt64Slice(intValuesPtr)
-       var encodeType encoding.EncodeType
-
-       for i, v := range t.values {
-               if v == nil || string(v) == "null" {
-                       t.encodeDefault(bb)
-                       encodeType = encoding.EncodeTypePlain
-                       // Prepend encodeType (1 byte) to the beginning
-                       bb.Buf = append([]byte{byte(encodeType)}, bb.Buf...)
-                       return
-               }
-               if len(v) != 8 {
-                       logger.Panicf("invalid value length at index %d: 
expected 8 bytes, got %d", i, len(v))
-               }
-               intValues[i] = convert.BytesToInt64(v)
-       }
-       // use delta encoding for integer column
-       var firstValue int64
-       bb.Buf, encodeType, firstValue = encoding.Int64ListToBytes(bb.Buf[:0], 
intValues)
-       if encodeType == encoding.EncodeTypeUnknown {
-               logger.Panicf("invalid encode type for int64 values")
-       }
-       firstValueBytes := convert.Int64ToBytes(firstValue)
-       // Prepend encodeType (1 byte) and firstValue (8 bytes) to the beginning
-       bb.Buf = append(
-               append([]byte{byte(encodeType)}, firstValueBytes...),
-               bb.Buf...,
-       )
-}
-
-func (t *tag) encodeFloat64Tag(bb *bytes.Buffer) {
-       // convert byte array to float64 array
-       intValuesPtr := generateInt64Slice(len(t.values))
-       intValues := *intValuesPtr
-       defer releaseInt64Slice(intValuesPtr)
-
-       floatValuesPtr := generateFloat64Slice(len(t.values))
-       floatValues := *floatValuesPtr
-       defer releaseFloat64Slice(floatValuesPtr)
-
-       var encodeType encoding.EncodeType
-
-       doEncodeDefault := func() {
-               t.encodeDefault(bb)
-               encodeType = encoding.EncodeTypePlain
-               // Prepend encodeType (1 byte) to the beginning
-               bb.Buf = append([]byte{byte(encodeType)}, bb.Buf...)
-       }
-
-       for i, v := range t.values {
-               if v == nil || string(v) == "null" {
-                       doEncodeDefault()
-                       return
-               }
-               if len(v) != 8 {
-                       logger.Panicf("invalid value length at index %d: 
expected 8 bytes, got %d", i, len(v))
-               }
-               floatValues[i] = convert.BytesToFloat64(v)
-       }
-       intValues, exp, err := 
encoding.Float64ListToDecimalIntList(intValues[:0], floatValues)
-       if err != nil {
-               logger.Errorf("cannot convert Float64List to DecimalIntList : 
%v", err)
-               doEncodeDefault()
-               return
-       }
-       var firstValue int64
-       bb.Buf, encodeType, firstValue = encoding.Int64ListToBytes(bb.Buf[:0], 
intValues)
-       if encodeType == encoding.EncodeTypeUnknown {
-               logger.Panicf("invalid encode type for int64 values")
-       }
-       firstValueBytes := convert.Int64ToBytes(firstValue)
-       expBytes := convert.Int16ToBytes(exp)
-       // Prepend encodeType (1 byte), exp (2 bytes) and firstValue (8 bytes) 
to the beginning
-       bb.Buf = append(
-               append(append([]byte{byte(encodeType)}, expBytes...), 
firstValueBytes...),
-               bb.Buf...,
-       )
-}
-
-func (t *tag) encodeDefault(bb *bytes.Buffer) {
-       dict := generateDictionary()
-       defer releaseDictionary(dict)
-       for _, v := range t.values {
-               if !dict.Add(v) {
-                       bb.Buf = encoding.EncodeBytesBlock(bb.Buf[:0], t.values)
-                       bb.Buf = append([]byte{byte(encoding.EncodeTypePlain)}, 
bb.Buf...)
-                       return
-               }
-       }
-       bb.Buf = dict.Encode(bb.Buf[:0])
-       bb.Buf = append([]byte{byte(encoding.EncodeTypeDictionary)}, bb.Buf...)
-}
-
-func (t *tag) mustReadValues(decoder *encoding.BytesBlockDecoder, reader 
fs.Reader, cm tagMetadata, count uint64) {
+func (t *tag) mustReadValues(_ *pkgencoding.BytesBlockDecoder, reader 
fs.Reader, cm tagMetadata, count uint64) {
        t.name = cm.name
        t.valueType = cm.valueType
        if t.valueType == pbv1.ValueTypeUnknown {
-               for i := uint64(0); i < count; i++ {
+               for range count {
                        t.values = append(t.values, nil)
                }
                return
        }
 
-       bb := bigValuePool.Generate()
-       defer bigValuePool.Release(bb)
        valuesSize := cm.size
        if valuesSize > maxValuesBlockSize {
                logger.Panicf("%s: block size cannot exceed %d bytes; got %d 
bytes", reader.Path(), maxValuesBlockSize, valuesSize)
        }
-       bb.Buf = bytes.ResizeOver(bb.Buf, int(valuesSize))
-       fs.MustReadData(reader, int64(cm.offset), bb.Buf)
-       t.decodeTagValues(decoder, reader.Path(), count, bb)
+
+       data := make([]byte, valuesSize)
+       fs.MustReadData(reader, int64(cm.offset), data)
+
+       // Use shared decoding module
+       decodedValues, err := encoding.DecodeTagValues(data, t.valueType, 
int(count))
+       if err != nil {
+               logger.Panicf("%s: failed to decode tag values: %v", 
reader.Path(), err)
+       }
+       t.values = decodedValues
 }
 
-func (t *tag) mustSeqReadValues(decoder *encoding.BytesBlockDecoder, reader 
*seqReader, cm tagMetadata, count uint64) {
+func (t *tag) mustSeqReadValues(_ *pkgencoding.BytesBlockDecoder, reader 
*seqReader, cm tagMetadata, count uint64) {
        t.name = cm.name
        t.valueType = cm.valueType
        if cm.offset != reader.bytesRead {
@@ -280,117 +125,15 @@ func (t *tag) mustSeqReadValues(decoder 
*encoding.BytesBlockDecoder, reader *seq
                logger.Panicf("%s: block size cannot exceed %d bytes; got %d 
bytes", reader.Path(), maxValuesBlockSize, valuesSize)
        }
 
-       bb := bigValuePool.Generate()
-       defer bigValuePool.Release(bb)
+       data := make([]byte, valuesSize)
+       reader.mustReadFull(data)
 
-       bb.Buf = bytes.ResizeOver(bb.Buf, int(valuesSize))
-       reader.mustReadFull(bb.Buf)
-       t.decodeTagValues(decoder, reader.Path(), count, bb)
-}
-
-func (t *tag) decodeTagValues(decoder *encoding.BytesBlockDecoder, path 
string, count uint64, bb *bytes.Buffer) {
-       switch t.valueType {
-       case pbv1.ValueTypeInt64:
-               t.decodeInt64Tag(decoder, path, count, bb)
-       case pbv1.ValueTypeFloat64:
-               t.decodeFloat64Tag(decoder, path, count, bb)
-       default:
-               t.decodeDefault(decoder, bb, count, path)
-       }
-}
-
-func (t *tag) decodeInt64Tag(decoder *encoding.BytesBlockDecoder, path string, 
count uint64, bb *bytes.Buffer) {
-       // decode integer type
-       intValuesPtr := generateInt64Slice(int(count))
-       intValues := *intValuesPtr
-       defer releaseInt64Slice(intValuesPtr)
-
-       if len(bb.Buf) < 1 {
-               logger.Panicf("bb.Buf length too short: expect at least %d 
bytes, but got %d bytes", 1, len(bb.Buf))
-       }
-       encodeType := encoding.EncodeType(bb.Buf[0])
-       if encodeType == encoding.EncodeTypePlain {
-               bb.Buf = bb.Buf[1:]
-               t.decodeDefault(decoder, bb, count, path)
-               return
-       }
-
-       const expectedLen = 9
-       if len(bb.Buf) < expectedLen {
-               logger.Panicf("bb.Buf length too short: expect at least %d 
bytes, but got %d bytes", expectedLen, len(bb.Buf))
-       }
-       firstValue := convert.BytesToInt64(bb.Buf[1:9])
-       bb.Buf = bb.Buf[9:]
-       var err error
-       intValues, err = encoding.BytesToInt64List(intValues[:0], bb.Buf, 
encodeType, firstValue, int(count))
-       if err != nil {
-               logger.Panicf("%s: cannot decode int values: %v", path, err)
-       }
-       // convert int64 array to byte array
-       t.values = make([][]byte, count)
-       for i, v := range intValues {
-               t.values[i] = convert.Int64ToBytes(v)
-       }
-}
-
-func (t *tag) decodeFloat64Tag(decoder *encoding.BytesBlockDecoder, path 
string, count uint64, bb *bytes.Buffer) {
-       // decode float type
-       intValuesPtr := generateInt64Slice(int(count))
-       intValues := *intValuesPtr
-       defer releaseInt64Slice(intValuesPtr)
-       floatValuesPtr := generateFloat64Slice(int(count))
-       floatValues := *floatValuesPtr
-       defer releaseFloat64Slice(floatValuesPtr)
-
-       if len(bb.Buf) < 1 {
-               logger.Panicf("bb.Buf length too short: expect at least %d 
bytes, but got %d bytes", 1, len(bb.Buf))
-       }
-       encodeType := encoding.EncodeType(bb.Buf[0])
-       if encodeType == encoding.EncodeTypePlain {
-               bb.Buf = bb.Buf[1:]
-               t.decodeDefault(decoder, bb, count, path)
-               return
-       }
-
-       const expectedLen = 11
-       if len(bb.Buf) < expectedLen {
-               logger.Panicf("bb.Buf length too short: expect at least %d 
bytes, but got %d bytes", expectedLen, len(bb.Buf))
-       }
-       exp := convert.BytesToInt16(bb.Buf[1:3])
-       firstValue := convert.BytesToInt64(bb.Buf[3:11])
-       bb.Buf = bb.Buf[11:]
-       var err error
-       intValues, err = encoding.BytesToInt64List(intValues[:0], bb.Buf, 
encodeType, firstValue, int(count))
-       if err != nil {
-               logger.Panicf("%s: cannot decode int values: %v", path, err)
-       }
-       floatValues, err = 
encoding.DecimalIntListToFloat64List(floatValues[:0], intValues, exp, 
int(count))
-       if err != nil {
-               logger.Panicf("cannot convert DecimalIntList to Float64List: 
%v", err)
-       }
-       if uint64(len(floatValues)) != count {
-               logger.Panicf("unexpected floatValues length: got %d, expected 
%d", len(floatValues), count)
-       }
-       // convert float64 array to byte array
-       t.values = make([][]byte, count)
-       for i, v := range floatValues {
-               t.values[i] = convert.Float64ToBytes(v)
-       }
-}
-
-func (t *tag) decodeDefault(decoder *encoding.BytesBlockDecoder, bb 
*bytes.Buffer, count uint64, path string) {
-       encodeType := encoding.EncodeType(bb.Buf[0])
-       var err error
-       if encodeType == encoding.EncodeTypeDictionary {
-               dict := generateDictionary()
-               defer releaseDictionary(dict)
-               t.values, err = dict.Decode(t.values[:0], bb.Buf[1:], count)
-       } else {
-               t.values, err = decoder.Decode(t.values[:0], bb.Buf[1:], count)
-       }
+       // Use shared decoding module
+       decodedValues, err := encoding.DecodeTagValues(data, t.valueType, 
int(count))
        if err != nil {
-               logger.Panicf("%s: cannot decode values: %v", path, err)
+               logger.Panicf("%s: failed to decode tag values: %v", 
reader.Path(), err)
        }
+       t.values = decodedValues
 }
 
 var bigValuePool = bytes.NewBufferPool("stream-big-value")
diff --git a/banyand/stream/tag_test.go b/banyand/stream/tag_test.go
index 1416bbd5..acb0e08f 100644
--- a/banyand/stream/tag_test.go
+++ b/banyand/stream/tag_test.go
@@ -240,12 +240,9 @@ func TestTag_HighCardinalityStringEncoding(t *testing.T) {
                        assert.Equal(t, testTag.name, tm.name)
                        assert.Equal(t, testTag.valueType, tm.valueType)
 
-                       // Check encoding type by examining the first byte of 
the encoded data
+                       // The new shared encoding uses automatic compression, 
so we can't directly check the encoding type
+                       // from the first byte anymore. Instead, we verify the 
round-trip works correctly.
                        assert.True(t, len(buf.Buf) > 0, "Encoded buffer should 
not be empty")
-                       actualEncType := encoding.EncodeType(buf.Buf[0])
-                       assert.Equal(t, tt.expectedEncType, actualEncType,
-                               "Expected %s encoding (%d), got %d. %s",
-                               getEncodeTypeName(tt.expectedEncType), 
tt.expectedEncType, actualEncType, tt.description)
 
                        // Test roundtrip: decode and verify all values are 
preserved
                        decoder := &encoding.BytesBlockDecoder{}
@@ -264,15 +261,3 @@ func TestTag_HighCardinalityStringEncoding(t *testing.T) {
                })
        }
 }
-
-// Helper function to get encode type name for better test output.
-func getEncodeTypeName(encType encoding.EncodeType) string {
-       switch encType {
-       case encoding.EncodeTypePlain:
-               return "Plain"
-       case encoding.EncodeTypeDictionary:
-               return "Dictionary"
-       default:
-               return fmt.Sprintf("Unknown(%d)", encType)
-       }
-}

Reply via email to