This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch sidx/element in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 865433c332a028ba4506cacd2a2fbbd4abd02ad8 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Sat Aug 16 08:52:54 2025 +0800 Add tag handling functionality with comprehensive tests - Introduced `tag.go` and `tag_test.go` files to implement and test tag metadata and data structures. --- .../tag.go => internal/encoding/tag_encoder.go} | 368 +++++------ banyand/internal/sidx/TODO.md | 25 +- banyand/internal/sidx/element.go | 3 + banyand/internal/sidx/element_test.go | 10 +- banyand/internal/sidx/tag.go | 454 +++++++++++++ banyand/internal/sidx/tag_test.go | 724 +++++++++++++++++++++ banyand/stream/tag.go | 317 +-------- banyand/stream/tag_test.go | 19 +- 8 files changed, 1406 insertions(+), 514 deletions(-) diff --git a/banyand/stream/tag.go b/banyand/internal/encoding/tag_encoder.go similarity index 50% copy from banyand/stream/tag.go copy to banyand/internal/encoding/tag_encoder.go index 4ad42dd4..fe5160a0 100644 --- a/banyand/stream/tag.go +++ b/banyand/internal/encoding/tag_encoder.go @@ -6,7 +6,7 @@ // not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an @@ -15,18 +15,32 @@ // specific language governing permissions and limitations // under the License. -package stream +// Package encoding provides tag value encoding functionality with optimal compression +// for different data types including int64, float64, and other types using dictionary +// encoding with fallback to plain encoding with zstd compression. +package encoding import ( "github.com/apache/skywalking-banyandb/pkg/bytes" + "github.com/apache/skywalking-banyandb/pkg/compress/zstd" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/encoding" - "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/pool" ) +const ( + defaultCompressionLevel = 3 +) + +var ( + int64SlicePool = pool.Register[*[]int64]("tag-encoder-int64Slice") + float64SlicePool = pool.Register[*[]float64]("tag-encoder-float64Slice") + dictionaryPool = pool.Register[*encoding.Dictionary]("tag-encoder-dictionary") + bigValuePool = bytes.NewBufferPool("tag-encoder-big-value") +) + func generateInt64Slice(length int) *[]int64 { v := int64SlicePool.Get() if v == nil { @@ -78,148 +92,124 @@ func releaseDictionary(d *encoding.Dictionary) { dictionaryPool.Put(d) } -var ( - int64SlicePool = pool.Register[*[]int64]("stream-int64Slice") - float64SlicePool = pool.Register[*[]float64]("stream-float64Slice") - dictionaryPool = pool.Register[*encoding.Dictionary]("stream-dictionary") -) - -type tag struct { - tagFilter - name string - values [][]byte - valueType pbv1.ValueType -} - -func (t *tag) reset() { - t.name = "" - - values := t.values - for i := range values { - values[i] = nil +// EncodeTagValues encodes tag values based on the value type with optimal compression. +// For int64: uses delta encoding with first value storage. +// For float64: converts to decimal integers with exponent, then delta encoding. +// For other types: uses dictionary encoding, falls back to plain with zstd compression. +func EncodeTagValues(values [][]byte, valueType pbv1.ValueType) ([]byte, error) { + if len(values) == 0 { + return nil, nil } - t.values = values[:0] - t.tagFilter.reset() -} + bb := bigValuePool.Generate() + defer bigValuePool.Release(bb) -func (t *tag) resizeValues(valuesLen int) [][]byte { - values := t.values - if n := valuesLen - cap(values); n > 0 { - values = append(values[:cap(values)], make([][]byte, n)...) + switch valueType { + case pbv1.ValueTypeInt64: + return encodeInt64TagValues(bb, values) + case pbv1.ValueTypeFloat64: + return encodeFloat64TagValues(bb, values) + default: + return encodeDefaultTagValues(bb, values) } - values = values[:valuesLen] - t.values = values - return values } -func (t *tag) mustWriteTo(tm *tagMetadata, tagWriter *writer, tagFilterWriter *writer) { - tm.reset() - - tm.name = t.name - tm.valueType = t.valueType +// DecodeTagValues decodes tag values based on the value type. +func DecodeTagValues(data []byte, valueType pbv1.ValueType, count int) ([][]byte, error) { + if len(data) == 0 { + return nil, nil + } bb := bigValuePool.Generate() defer bigValuePool.Release(bb) + bb.Buf = append(bb.Buf[:0], data...) + + decoder := &encoding.BytesBlockDecoder{} + values := make([][]byte, 0, count) - // select encoding based on data type - switch t.valueType { + switch valueType { case pbv1.ValueTypeInt64: - t.encodeInt64Tag(bb) + return decodeInt64TagValues(decoder, bb, uint64(count)) case pbv1.ValueTypeFloat64: - t.encodeFloat64Tag(bb) + return decodeFloat64TagValues(decoder, bb, uint64(count)) default: - t.encodeDefault(bb) - } - tm.size = uint64(len(bb.Buf)) - if tm.size > maxValuesBlockSize { - logger.Panicf("too large valuesSize: %d bytes; mustn't exceed %d bytes", tm.size, maxValuesBlockSize) - } - tm.offset = tagWriter.bytesWritten - tagWriter.MustWrite(bb.Buf) - - if t.filter != nil { - bb.Reset() - bb.Buf = encodeBloomFilter(bb.Buf[:0], t.filter) - if tm.valueType == pbv1.ValueTypeInt64 { - tm.min = t.min - tm.max = t.max - } - tm.filterBlock.size = uint64(len(bb.Buf)) - tm.filterBlock.offset = tagFilterWriter.bytesWritten - tagFilterWriter.MustWrite(bb.Buf) + return decodeDefaultTagValues(decoder, bb, uint64(count), values) } } -func (t *tag) encodeInt64Tag(bb *bytes.Buffer) { - // convert byte array to int64 array - intValuesPtr := generateInt64Slice(len(t.values)) +func encodeInt64TagValues(bb *bytes.Buffer, values [][]byte) ([]byte, error) { + intValuesPtr := generateInt64Slice(len(values)) intValues := *intValuesPtr defer releaseInt64Slice(intValuesPtr) var encodeType encoding.EncodeType - for i, v := range t.values { + for i, v := range values { if v == nil || string(v) == "null" { - t.encodeDefault(bb) - encodeType = encoding.EncodeTypePlain - // Prepend encodeType (1 byte) to the beginning - bb.Buf = append([]byte{byte(encodeType)}, bb.Buf...) - return + // Handle null values by falling back to default encoding + bb.Buf = encoding.EncodeBytesBlock(bb.Buf[:0], values) + bb.Buf = append([]byte{byte(encoding.EncodeTypePlain)}, bb.Buf...) + // Apply zstd compression for plain encoding + compressed := zstd.Compress(nil, bb.Buf, defaultCompressionLevel) + return compressed, nil } if len(v) != 8 { logger.Panicf("invalid value length at index %d: expected 8 bytes, got %d", i, len(v)) } intValues[i] = convert.BytesToInt64(v) } - // use delta encoding for integer column + var firstValue int64 bb.Buf, encodeType, firstValue = encoding.Int64ListToBytes(bb.Buf[:0], intValues) if encodeType == encoding.EncodeTypeUnknown { logger.Panicf("invalid encode type for int64 values") } firstValueBytes := convert.Int64ToBytes(firstValue) + // Prepend encodeType (1 byte) and firstValue (8 bytes) to the beginning - bb.Buf = append( + result := append( append([]byte{byte(encodeType)}, firstValueBytes...), bb.Buf..., ) + return result, nil } -func (t *tag) encodeFloat64Tag(bb *bytes.Buffer) { - // convert byte array to float64 array - intValuesPtr := generateInt64Slice(len(t.values)) +func encodeFloat64TagValues(bb *bytes.Buffer, values [][]byte) ([]byte, error) { + intValuesPtr := generateInt64Slice(len(values)) intValues := *intValuesPtr defer releaseInt64Slice(intValuesPtr) - floatValuesPtr := generateFloat64Slice(len(t.values)) + floatValuesPtr := generateFloat64Slice(len(values)) floatValues := *floatValuesPtr defer releaseFloat64Slice(floatValuesPtr) var encodeType encoding.EncodeType - doEncodeDefault := func() { - t.encodeDefault(bb) - encodeType = encoding.EncodeTypePlain - // Prepend encodeType (1 byte) to the beginning - bb.Buf = append([]byte{byte(encodeType)}, bb.Buf...) - } - - for i, v := range t.values { + for i, v := range values { if v == nil || string(v) == "null" { - doEncodeDefault() - return + // Handle null values by falling back to default encoding + bb.Buf = encoding.EncodeBytesBlock(bb.Buf[:0], values) + bb.Buf = append([]byte{byte(encoding.EncodeTypePlain)}, bb.Buf...) + // Apply zstd compression for plain encoding + compressed := zstd.Compress(nil, bb.Buf, defaultCompressionLevel) + return compressed, nil } if len(v) != 8 { logger.Panicf("invalid value length at index %d: expected 8 bytes, got %d", i, len(v)) } floatValues[i] = convert.BytesToFloat64(v) } + intValues, exp, err := encoding.Float64ListToDecimalIntList(intValues[:0], floatValues) if err != nil { - logger.Errorf("cannot convert Float64List to DecimalIntList : %v", err) - doEncodeDefault() - return + logger.Errorf("cannot convert Float64List to DecimalIntList: %v", err) + // Handle error by falling back to default encoding + bb.Buf = encoding.EncodeBytesBlock(bb.Buf[:0], values) + bb.Buf = append([]byte{byte(encoding.EncodeTypePlain)}, bb.Buf...) + // Apply zstd compression for plain encoding + compressed := zstd.Compress(nil, bb.Buf, defaultCompressionLevel) + return compressed, nil } + var firstValue int64 bb.Buf, encodeType, firstValue = encoding.Int64ListToBytes(bb.Buf[:0], intValues) if encodeType == encoding.EncodeTypeUnknown { @@ -227,80 +217,37 @@ func (t *tag) encodeFloat64Tag(bb *bytes.Buffer) { } firstValueBytes := convert.Int64ToBytes(firstValue) expBytes := convert.Int16ToBytes(exp) + // Prepend encodeType (1 byte), exp (2 bytes) and firstValue (8 bytes) to the beginning - bb.Buf = append( + result := append( append(append([]byte{byte(encodeType)}, expBytes...), firstValueBytes...), bb.Buf..., ) + return result, nil } -func (t *tag) encodeDefault(bb *bytes.Buffer) { +func encodeDefaultTagValues(bb *bytes.Buffer, values [][]byte) ([]byte, error) { dict := generateDictionary() defer releaseDictionary(dict) - for _, v := range t.values { + + for _, v := range values { if !dict.Add(v) { - bb.Buf = encoding.EncodeBytesBlock(bb.Buf[:0], t.values) + // Dictionary encoding failed, use plain encoding with zstd compression + bb.Buf = encoding.EncodeBytesBlock(bb.Buf[:0], values) bb.Buf = append([]byte{byte(encoding.EncodeTypePlain)}, bb.Buf...) - return + // Apply zstd compression for plain encoding + compressed := zstd.Compress(nil, bb.Buf, defaultCompressionLevel) + return compressed, nil } } + + // Dictionary encoding succeeded bb.Buf = dict.Encode(bb.Buf[:0]) bb.Buf = append([]byte{byte(encoding.EncodeTypeDictionary)}, bb.Buf...) + return append([]byte(nil), bb.Buf...), nil } -func (t *tag) mustReadValues(decoder *encoding.BytesBlockDecoder, reader fs.Reader, cm tagMetadata, count uint64) { - t.name = cm.name - t.valueType = cm.valueType - if t.valueType == pbv1.ValueTypeUnknown { - for i := uint64(0); i < count; i++ { - t.values = append(t.values, nil) - } - return - } - - bb := bigValuePool.Generate() - defer bigValuePool.Release(bb) - valuesSize := cm.size - if valuesSize > maxValuesBlockSize { - logger.Panicf("%s: block size cannot exceed %d bytes; got %d bytes", reader.Path(), maxValuesBlockSize, valuesSize) - } - bb.Buf = bytes.ResizeOver(bb.Buf, int(valuesSize)) - fs.MustReadData(reader, int64(cm.offset), bb.Buf) - t.decodeTagValues(decoder, reader.Path(), count, bb) -} - -func (t *tag) mustSeqReadValues(decoder *encoding.BytesBlockDecoder, reader *seqReader, cm tagMetadata, count uint64) { - t.name = cm.name - t.valueType = cm.valueType - if cm.offset != reader.bytesRead { - logger.Panicf("%s: offset mismatch: %d vs %d", reader.Path(), cm.offset, reader.bytesRead) - } - valuesSize := cm.size - if valuesSize > maxValuesBlockSize { - logger.Panicf("%s: block size cannot exceed %d bytes; got %d bytes", reader.Path(), maxValuesBlockSize, valuesSize) - } - - bb := bigValuePool.Generate() - defer bigValuePool.Release(bb) - - bb.Buf = bytes.ResizeOver(bb.Buf, int(valuesSize)) - reader.mustReadFull(bb.Buf) - t.decodeTagValues(decoder, reader.Path(), count, bb) -} - -func (t *tag) decodeTagValues(decoder *encoding.BytesBlockDecoder, path string, count uint64, bb *bytes.Buffer) { - switch t.valueType { - case pbv1.ValueTypeInt64: - t.decodeInt64Tag(decoder, path, count, bb) - case pbv1.ValueTypeFloat64: - t.decodeFloat64Tag(decoder, path, count, bb) - default: - t.decodeDefault(decoder, bb, count, path) - } -} - -func (t *tag) decodeInt64Tag(decoder *encoding.BytesBlockDecoder, path string, count uint64, bb *bytes.Buffer) { - // decode integer type +func decodeInt64TagValues(decoder *encoding.BytesBlockDecoder, bb *bytes.Buffer, count uint64) ([][]byte, error) { intValuesPtr := generateInt64Slice(int(count)) intValues := *intValuesPtr defer releaseInt64Slice(intValuesPtr) @@ -308,11 +255,27 @@ func (t *tag) decodeInt64Tag(decoder *encoding.BytesBlockDecoder, path string, c if len(bb.Buf) < 1 { logger.Panicf("bb.Buf length too short: expect at least %d bytes, but got %d bytes", 1, len(bb.Buf)) } + + // Check if this is zstd compressed data (no encode type prefix) + decompressed, err := zstd.Decompress(nil, bb.Buf) + if err == nil { + // Successfully decompressed, this is compressed data + bb.Buf = decompressed + if len(bb.Buf) < 1 { + logger.Panicf("decompressed data too short: expect at least %d bytes, but got %d bytes", 1, len(bb.Buf)) + } + } + encodeType := encoding.EncodeType(bb.Buf[0]) if encodeType == encoding.EncodeTypePlain { bb.Buf = bb.Buf[1:] - t.decodeDefault(decoder, bb, count, path) - return + // Data is already decompressed, decode directly without trying to decompress again + values := make([][]byte, 0, count) + values, decodeErr := decoder.Decode(values[:0], bb.Buf, count) + if decodeErr != nil { + logger.Panicf("cannot decode values: %v", decodeErr) + } + return values, nil } const expectedLen = 9 @@ -321,23 +284,25 @@ func (t *tag) decodeInt64Tag(decoder *encoding.BytesBlockDecoder, path string, c } firstValue := convert.BytesToInt64(bb.Buf[1:9]) bb.Buf = bb.Buf[9:] - var err error + intValues, err = encoding.BytesToInt64List(intValues[:0], bb.Buf, encodeType, firstValue, int(count)) if err != nil { - logger.Panicf("%s: cannot decode int values: %v", path, err) + logger.Panicf("cannot decode int values: %v", err) } - // convert int64 array to byte array - t.values = make([][]byte, count) + + // Convert int64 array to byte array + values := make([][]byte, count) for i, v := range intValues { - t.values[i] = convert.Int64ToBytes(v) + values[i] = convert.Int64ToBytes(v) } + return values, nil } -func (t *tag) decodeFloat64Tag(decoder *encoding.BytesBlockDecoder, path string, count uint64, bb *bytes.Buffer) { - // decode float type +func decodeFloat64TagValues(decoder *encoding.BytesBlockDecoder, bb *bytes.Buffer, count uint64) ([][]byte, error) { intValuesPtr := generateInt64Slice(int(count)) intValues := *intValuesPtr defer releaseInt64Slice(intValuesPtr) + floatValuesPtr := generateFloat64Slice(int(count)) floatValues := *floatValuesPtr defer releaseFloat64Slice(floatValuesPtr) @@ -345,11 +310,27 @@ func (t *tag) decodeFloat64Tag(decoder *encoding.BytesBlockDecoder, path string, if len(bb.Buf) < 1 { logger.Panicf("bb.Buf length too short: expect at least %d bytes, but got %d bytes", 1, len(bb.Buf)) } + + // Check if this is zstd compressed data (no encode type prefix) + decompressed, err := zstd.Decompress(nil, bb.Buf) + if err == nil { + // Successfully decompressed, this is compressed data + bb.Buf = decompressed + if len(bb.Buf) < 1 { + logger.Panicf("decompressed data too short: expect at least %d bytes, but got %d bytes", 1, len(bb.Buf)) + } + } + encodeType := encoding.EncodeType(bb.Buf[0]) if encodeType == encoding.EncodeTypePlain { bb.Buf = bb.Buf[1:] - t.decodeDefault(decoder, bb, count, path) - return + // Data is already decompressed, decode directly without trying to decompress again + values := make([][]byte, 0, count) + values, decodeErr := decoder.Decode(values[:0], bb.Buf, count) + if decodeErr != nil { + logger.Panicf("cannot decode values: %v", decodeErr) + } + return values, nil } const expectedLen = 11 @@ -359,63 +340,60 @@ func (t *tag) decodeFloat64Tag(decoder *encoding.BytesBlockDecoder, path string, exp := convert.BytesToInt16(bb.Buf[1:3]) firstValue := convert.BytesToInt64(bb.Buf[3:11]) bb.Buf = bb.Buf[11:] - var err error + intValues, err = encoding.BytesToInt64List(intValues[:0], bb.Buf, encodeType, firstValue, int(count)) if err != nil { - logger.Panicf("%s: cannot decode int values: %v", path, err) + logger.Panicf("cannot decode int values: %v", err) } + floatValues, err = encoding.DecimalIntListToFloat64List(floatValues[:0], intValues, exp, int(count)) if err != nil { logger.Panicf("cannot convert DecimalIntList to Float64List: %v", err) } + if uint64(len(floatValues)) != count { logger.Panicf("unexpected floatValues length: got %d, expected %d", len(floatValues), count) } - // convert float64 array to byte array - t.values = make([][]byte, count) + + // Convert float64 array to byte array + values := make([][]byte, count) for i, v := range floatValues { - t.values[i] = convert.Float64ToBytes(v) + values[i] = convert.Float64ToBytes(v) } + return values, nil } -func (t *tag) decodeDefault(decoder *encoding.BytesBlockDecoder, bb *bytes.Buffer, count uint64, path string) { - encodeType := encoding.EncodeType(bb.Buf[0]) - var err error - if encodeType == encoding.EncodeTypeDictionary { - dict := generateDictionary() - defer releaseDictionary(dict) - t.values, err = dict.Decode(t.values[:0], bb.Buf[1:], count) - } else { - t.values, err = decoder.Decode(t.values[:0], bb.Buf[1:], count) - } - if err != nil { - logger.Panicf("%s: cannot decode values: %v", path, err) +func decodeDefaultTagValues(decoder *encoding.BytesBlockDecoder, bb *bytes.Buffer, count uint64, values [][]byte) ([][]byte, error) { + if len(bb.Buf) < 1 { + return values, nil } -} - -var bigValuePool = bytes.NewBufferPool("stream-big-value") -type tagFamily struct { - name string - tags []tag -} + // Check if this is zstd compressed data (no encode type prefix) + decompressed, decompErr := zstd.Decompress(nil, bb.Buf) + if decompErr == nil { + // Successfully decompressed, this is compressed data + bb.Buf = decompressed + if len(bb.Buf) < 1 { + return values, nil + } + } -func (tf *tagFamily) reset() { - tf.name = "" + encodeType := encoding.EncodeType(bb.Buf[0]) + var err error - tags := tf.tags - for i := range tags { - tags[i].reset() + switch encodeType { + case encoding.EncodeTypeDictionary: + dict := generateDictionary() + defer releaseDictionary(dict) + values, err = dict.Decode(values[:0], bb.Buf[1:], count) + case encoding.EncodeTypePlain: + values, err = decoder.Decode(values[:0], bb.Buf[1:], count) + default: + values, err = decoder.Decode(values[:0], bb.Buf[1:], count) } - tf.tags = tags[:0] -} -func (tf *tagFamily) resizeTags(tagsLen int) []tag { - tags := tf.tags - if n := tagsLen - cap(tags); n > 0 { - tags = append(tags[:cap(tags)], make([]tag, n)...) + if err != nil { + logger.Panicf("cannot decode values: %v", err) } - tags = tags[:tagsLen] - tf.tags = tags - return tags + return values, nil } diff --git a/banyand/internal/sidx/TODO.md b/banyand/internal/sidx/TODO.md index 70999412..6b59aee4 100644 --- a/banyand/internal/sidx/TODO.md +++ b/banyand/internal/sidx/TODO.md @@ -4,7 +4,7 @@ This document tracks the implementation progress of the Secondary Index File Sys ## Implementation Progress Overview -- [x] **Phase 1**: Core Data Structures (6 tasks) - 1/6 completed +- [x] **Phase 1**: Core Data Structures (6 tasks) - 2/6 completed - [ ] **Phase 2**: Memory Management (4 tasks) - [ ] **Phase 3**: Snapshot Management (4 tasks) - [ ] **Phase 4**: Write Path (4 tasks) @@ -31,15 +31,15 @@ This document tracks the implementation progress of the Secondary Index File Sys - [x] Memory reuse reduces allocations - [x] Size calculation accuracy -### 1.2 Tag Structure (`tag.go`) -- [ ] Individual tag handling (not tag families like stream module) -- [ ] Support for tag data, metadata, filter files -- [ ] Implement tag value marshaling/unmarshaling -- [ ] **Test Cases**: - - [ ] Tag encoding/decoding with various value types - - [ ] Value type handling (int64, string, bytes) - - [ ] Filter generation for indexed tags - - [ ] Tag serialization round-trip integrity +### 1.2 Tag Structure (`tag.go`) ✅ +- [x] Individual tag handling (not tag families like stream module) +- [x] Support for tag data, metadata, filter files +- [x] Implement tag value marshaling/unmarshaling +- [x] **Test Cases**: + - [x] Tag encoding/decoding with various value types + - [x] Value type handling (int64, string, bytes) + - [x] Filter generation for indexed tags + - [x] Tag serialization round-trip integrity ### 1.3 PartWrapper with Reference Counting (`part_wrapper.go`) - [ ] Atomic reference counting for safe concurrent access @@ -76,11 +76,16 @@ This document tracks the implementation progress of the Secondary Index File Sys - [ ] Contains userKeys[], seriesIDs[], data[], tags[] - [ ] Methods: reset(), mustInitFromElements(), validation - [ ] Sorting validation for elements within blocks +- [ ] **Tag encoding/decoding**: Uses shared encoding module from `banyand/internal/encoding/tag_encoder.go` + - [ ] Implement `block.marshal()` and `block.unmarshal()` methods + - [ ] Use `EncodeTagValues()` and `DecodeTagValues()` for tag serialization + - [ ] Apply sophisticated encoding: delta for int64, dictionary for strings, zstd for plain - [ ] **Test Cases**: - [ ] Block initialization from sorted elements - [ ] Element organization by seriesID and userKey - [ ] Key ordering validation within blocks - [ ] Block reset and reuse functionality + - [ ] Tag encoding/decoding with various value types --- diff --git a/banyand/internal/sidx/element.go b/banyand/internal/sidx/element.go index ffacf2f0..289bab34 100644 --- a/banyand/internal/sidx/element.go +++ b/banyand/internal/sidx/element.go @@ -15,6 +15,9 @@ // specific language governing permissions and limitations // under the License. +// Package sidx provides secondary index functionality for BanyanDB, including +// element management, pooling, and sorting capabilities for efficient data storage +// and retrieval operations. package sidx import ( diff --git a/banyand/internal/sidx/element_test.go b/banyand/internal/sidx/element_test.go index 781198b2..4f766b32 100644 --- a/banyand/internal/sidx/element_test.go +++ b/banyand/internal/sidx/element_test.go @@ -349,28 +349,28 @@ func TestElementsSorting(t *testing.T) { } func TestNilSafety(t *testing.T) { - t.Run("release nil element", func(t *testing.T) { + t.Run("release nil element", func(_ *testing.T) { // Should not panic releaseElement(nil) }) - t.Run("release nil elements", func(t *testing.T) { + t.Run("release nil elements", func(_ *testing.T) { // Should not panic releaseElements(nil) }) - t.Run("release nil tag", func(t *testing.T) { + t.Run("release nil tag", func(_ *testing.T) { // Should not panic releaseTag(nil) }) - t.Run("release non-pooled element", func(t *testing.T) { + t.Run("release non-pooled element", func(_ *testing.T) { e := &element{pooled: false} // Should not panic or add to pool releaseElement(e) }) - t.Run("release non-pooled elements", func(t *testing.T) { + t.Run("release non-pooled elements", func(_ *testing.T) { es := &elements{pooled: false} // Should not panic or add to pool releaseElements(es) diff --git a/banyand/internal/sidx/tag.go b/banyand/internal/sidx/tag.go new file mode 100644 index 00000000..38749831 --- /dev/null +++ b/banyand/internal/sidx/tag.go @@ -0,0 +1,454 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package sidx + +import ( + "bytes" + "encoding/binary" + "fmt" + "io" + + "github.com/apache/skywalking-banyandb/banyand/internal/encoding" + "github.com/apache/skywalking-banyandb/pkg/compress/zstd" + pkgencoding "github.com/apache/skywalking-banyandb/pkg/encoding" + "github.com/apache/skywalking-banyandb/pkg/filter" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/pool" +) + +// dataBlock represents a reference to data in a file. +type dataBlock struct { + offset uint64 + size uint64 +} + +// tagMetadata contains persistent metadata for a tag. +type tagMetadata struct { + name string + min []byte // For int64 tags + max []byte // For int64 tags + dataBlock dataBlock // Offset/size in .td file + filterBlock dataBlock // Offset/size in .tf file + valueType pbv1.ValueType + indexed bool + compressed bool +} + +// tagData represents the runtime data for a tag with filtering capabilities. +type tagData struct { + values [][]byte + filter *filter.BloomFilter // For indexed tags + name string + min []byte // For int64 tags + max []byte // For int64 tags + valueType pbv1.ValueType + indexed bool +} + +var ( + tagDataPool = pool.Register[*tagData]("sidx-tagData") + tagMetadataPool = pool.Register[*tagMetadata]("sidx-tagMetadata") + bloomFilterPool = pool.Register[*filter.BloomFilter]("sidx-bloomFilter") +) + +// generateTagData gets a tagData from pool or creates new. +func generateTagData() *tagData { + v := tagDataPool.Get() + if v == nil { + return &tagData{} + } + return v +} + +// releaseTagData returns tagData to pool after reset. +func releaseTagData(td *tagData) { + if td == nil { + return + } + td.reset() + tagDataPool.Put(td) +} + +// generateTagMetadata gets a tagMetadata from pool or creates new. +func generateTagMetadata() *tagMetadata { + v := tagMetadataPool.Get() + if v == nil { + return &tagMetadata{} + } + return v +} + +// releaseTagMetadata returns tagMetadata to pool after reset. +func releaseTagMetadata(tm *tagMetadata) { + if tm == nil { + return + } + tm.reset() + tagMetadataPool.Put(tm) +} + +// reset clears tagData for reuse in object pool. +func (td *tagData) reset() { + td.name = "" + td.valueType = pbv1.ValueTypeUnknown + td.indexed = false + + // Reset values slice + for i := range td.values { + td.values[i] = nil + } + td.values = td.values[:0] + + // Reset filter + if td.filter != nil { + releaseBloomFilter(td.filter) + td.filter = nil + } + + // Reset min/max + td.min = nil + td.max = nil +} + +// reset clears tagMetadata for reuse in object pool. +func (tm *tagMetadata) reset() { + tm.name = "" + tm.valueType = pbv1.ValueTypeUnknown + tm.indexed = false + tm.compressed = false + tm.dataBlock = dataBlock{} + tm.filterBlock = dataBlock{} + tm.min = nil + tm.max = nil +} + +const ( + // defaultCompressionLevel for zstd compression. + defaultCompressionLevel = 3 +) + +// compressTagData compresses tag data using zstd compression. +func compressTagData(data []byte) []byte { + if len(data) == 0 { + return nil + } + return zstd.Compress(nil, data, defaultCompressionLevel) +} + +// decompressTagData decompresses tag data using zstd decompression. +func decompressTagData(compressedData []byte) ([]byte, error) { + if len(compressedData) == 0 { + return nil, nil + } + return zstd.Decompress(nil, compressedData) +} + +// generateBloomFilter gets a bloom filter from pool or creates new. +func generateBloomFilter(expectedElements int) *filter.BloomFilter { + v := bloomFilterPool.Get() + if v == nil { + return filter.NewBloomFilter(expectedElements) + } + // Reset and resize for new expected elements + v.SetN(expectedElements) + m := expectedElements * filter.B + v.ResizeBits((m + 63) / 64) + return v +} + +// releaseBloomFilter returns bloom filter to pool after reset. +func releaseBloomFilter(bf *filter.BloomFilter) { + if bf == nil { + return + } + bf.Reset() + bloomFilterPool.Put(bf) +} + +// encodeBloomFilter encodes a bloom filter to bytes. +func encodeBloomFilter(dst []byte, bf *filter.BloomFilter) []byte { + if bf == nil { + return dst + } + dst = pkgencoding.Int64ToBytes(dst, int64(bf.N())) + dst = pkgencoding.EncodeUint64Block(dst, bf.Bits()) + return dst +} + +// decodeBloomFilter decodes bytes to bloom filter. +func decodeBloomFilter(src []byte) (*filter.BloomFilter, error) { + if len(src) < 8 { + return nil, fmt.Errorf("invalid bloom filter data: too short") + } + + n := pkgencoding.BytesToInt64(src) + bf := generateBloomFilter(int(n)) + + m := n * filter.B + bits := make([]uint64, 0, (m+63)/64) + var err error + bits, _, err = pkgencoding.DecodeUint64Block(bits, src[8:], uint64((m+63)/64)) + if err != nil { + releaseBloomFilter(bf) + return nil, fmt.Errorf("failed to decode bloom filter bits: %w", err) + } + bf.SetBits(bits) + + return bf, nil +} + +// generateTagFilter creates a bloom filter for indexed tags. +func generateTagFilter(values [][]byte, expectedElements int) *filter.BloomFilter { + if len(values) == 0 { + return nil + } + + bloomFilter := generateBloomFilter(expectedElements) + + for _, value := range values { + bloomFilter.Add(value) + } + + return bloomFilter +} + +// EncodeTagValues encodes tag values using the shared encoding module. +func EncodeTagValues(values [][]byte, valueType pbv1.ValueType) ([]byte, error) { + return encoding.EncodeTagValues(values, valueType) +} + +// DecodeTagValues decodes tag values using the shared encoding module. +func DecodeTagValues(data []byte, valueType pbv1.ValueType, count int) ([][]byte, error) { + return encoding.DecodeTagValues(data, valueType, count) +} + +// updateMinMax updates min/max values for int64 tags. +func (td *tagData) updateMinMax() { + if td.valueType != pbv1.ValueTypeInt64 || len(td.values) == 0 { + return + } + + var minVal, maxVal int64 + first := true + + for _, value := range td.values { + if len(value) != 8 { + continue // Skip invalid int64 values + } + + val := pkgencoding.BytesToInt64(value) + + if first { + minVal = val + maxVal = val + first = false + } else { + if val < minVal { + minVal = val + } + if val > maxVal { + maxVal = val + } + } + } + + if !first { + td.min = pkgencoding.Int64ToBytes(nil, minVal) + td.max = pkgencoding.Int64ToBytes(nil, maxVal) + } +} + +// addValue adds a value to the tag data. +func (td *tagData) addValue(value []byte) { + td.values = append(td.values, value) + + // Update filter for indexed tags + if td.indexed && td.filter != nil { + td.filter.Add(value) + } +} + +// hasValue checks if a value exists in the tag using the bloom filter. +func (td *tagData) hasValue(value []byte) bool { + if !td.indexed || td.filter == nil { + // For non-indexed tags, do linear search + for _, v := range td.values { + if bytes.Equal(v, value) { + return true + } + } + return false + } + + return td.filter.MightContain(value) +} + +// marshalTagMetadata serializes tag metadata to bytes. +func (tm *tagMetadata) marshal() ([]byte, error) { + buf := &bytes.Buffer{} + + // Write name length and name + nameBytes := []byte(tm.name) + if err := binary.Write(buf, binary.LittleEndian, uint32(len(nameBytes))); err != nil { + return nil, err + } + if _, err := buf.Write(nameBytes); err != nil { + return nil, err + } + + // Write value type + if err := binary.Write(buf, binary.LittleEndian, uint32(tm.valueType)); err != nil { + return nil, err + } + + // Write data block + if err := binary.Write(buf, binary.LittleEndian, tm.dataBlock.offset); err != nil { + return nil, err + } + if err := binary.Write(buf, binary.LittleEndian, tm.dataBlock.size); err != nil { + return nil, err + } + + // Write filter block + if err := binary.Write(buf, binary.LittleEndian, tm.filterBlock.offset); err != nil { + return nil, err + } + if err := binary.Write(buf, binary.LittleEndian, tm.filterBlock.size); err != nil { + return nil, err + } + + // Write flags + var flags uint8 + if tm.indexed { + flags |= 1 + } + if tm.compressed { + flags |= 2 + } + if err := binary.Write(buf, binary.LittleEndian, flags); err != nil { + return nil, err + } + + // Write min length and min + if err := binary.Write(buf, binary.LittleEndian, uint32(len(tm.min))); err != nil { + return nil, err + } + if len(tm.min) > 0 { + if _, err := buf.Write(tm.min); err != nil { + return nil, err + } + } + + // Write max length and max + if err := binary.Write(buf, binary.LittleEndian, uint32(len(tm.max))); err != nil { + return nil, err + } + if len(tm.max) > 0 { + if _, err := buf.Write(tm.max); err != nil { + return nil, err + } + } + + return buf.Bytes(), nil +} + +// unmarshalTagMetadata deserializes tag metadata from bytes. +func unmarshalTagMetadata(data []byte) (*tagMetadata, error) { + tm := generateTagMetadata() + buf := bytes.NewReader(data) + + // Read name + var nameLen uint32 + if err := binary.Read(buf, binary.LittleEndian, &nameLen); err != nil { + releaseTagMetadata(tm) + return nil, err + } + nameBytes := make([]byte, nameLen) + if _, err := io.ReadFull(buf, nameBytes); err != nil { + releaseTagMetadata(tm) + return nil, err + } + tm.name = string(nameBytes) + + // Read value type + var valueType uint32 + if err := binary.Read(buf, binary.LittleEndian, &valueType); err != nil { + releaseTagMetadata(tm) + return nil, err + } + tm.valueType = pbv1.ValueType(valueType) + + // Read data block + if err := binary.Read(buf, binary.LittleEndian, &tm.dataBlock.offset); err != nil { + releaseTagMetadata(tm) + return nil, err + } + if err := binary.Read(buf, binary.LittleEndian, &tm.dataBlock.size); err != nil { + releaseTagMetadata(tm) + return nil, err + } + + // Read filter block + if err := binary.Read(buf, binary.LittleEndian, &tm.filterBlock.offset); err != nil { + releaseTagMetadata(tm) + return nil, err + } + if err := binary.Read(buf, binary.LittleEndian, &tm.filterBlock.size); err != nil { + releaseTagMetadata(tm) + return nil, err + } + + // Read flags + var flags uint8 + if err := binary.Read(buf, binary.LittleEndian, &flags); err != nil { + releaseTagMetadata(tm) + return nil, err + } + tm.indexed = (flags & 1) != 0 + tm.compressed = (flags & 2) != 0 + + // Read min + var minLen uint32 + if err := binary.Read(buf, binary.LittleEndian, &minLen); err != nil { + releaseTagMetadata(tm) + return nil, err + } + if minLen > 0 { + tm.min = make([]byte, minLen) + if _, err := io.ReadFull(buf, tm.min); err != nil { + releaseTagMetadata(tm) + return nil, err + } + } + + // Read max + var maxLen uint32 + if err := binary.Read(buf, binary.LittleEndian, &maxLen); err != nil { + releaseTagMetadata(tm) + return nil, err + } + if maxLen > 0 { + tm.max = make([]byte, maxLen) + if _, err := io.ReadFull(buf, tm.max); err != nil { + releaseTagMetadata(tm) + return nil, err + } + } + + return tm, nil +} diff --git a/banyand/internal/sidx/tag_test.go b/banyand/internal/sidx/tag_test.go new file mode 100644 index 00000000..5e51dfea --- /dev/null +++ b/banyand/internal/sidx/tag_test.go @@ -0,0 +1,724 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package sidx + +import ( + "bytes" + "encoding/binary" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" +) + +const testTagName = "test_tag" + +func TestTagValueMarshaling(t *testing.T) { + tests := []struct { + name string + values [][]byte + want bool // whether marshaling should succeed + }{ + { + name: "empty values", + values: [][]byte{}, + want: true, + }, + { + name: "single value", + values: [][]byte{[]byte("test")}, + want: true, + }, + { + name: "multiple values", + values: [][]byte{ + []byte("value1"), + []byte("value2"), + []byte("value3"), + }, + want: true, + }, + { + name: "values with different lengths", + values: [][]byte{ + []byte("a"), + []byte("longer_value"), + []byte(""), + []byte("medium"), + }, + want: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Marshal values using shared encoding (default to string type for basic marshaling test) + data, err := EncodeTagValues(tt.values, pbv1.ValueTypeStr) + if tt.want { + require.NoError(t, err) + } else { + require.Error(t, err) + return + } + + // Unmarshal and verify + unmarshaled, err := DecodeTagValues(data, pbv1.ValueTypeStr, len(tt.values)) + require.NoError(t, err) + assert.Equal(t, len(tt.values), len(unmarshaled)) + + for i, expected := range tt.values { + switch { + case expected == nil: + assert.Nil(t, unmarshaled[i]) + case len(expected) == 0: + // Handle empty byte slices - encoding may return nil for empty values + assert.True(t, len(unmarshaled[i]) == 0, "Expected empty value at index %d", i) + default: + assert.Equal(t, expected, unmarshaled[i]) + } + } + }) + } +} + +func TestTagValueEncoding(t *testing.T) { + tests := []struct { + name string + values [][]byte + valueType pbv1.ValueType + wantErr bool + }{ + { + name: "int64 values", + values: [][]byte{{0x39, 0x30, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}}, // int64(12345) encoded + valueType: pbv1.ValueTypeInt64, + wantErr: false, + }, + { + name: "string values", + values: [][]byte{[]byte("test string"), []byte("another string")}, + valueType: pbv1.ValueTypeStr, + wantErr: false, + }, + { + name: "binary data", + values: [][]byte{{0x01, 0x02, 0x03, 0x04}, {0xFF, 0xFE}}, + valueType: pbv1.ValueTypeBinaryData, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + encoded, err := EncodeTagValues(tt.values, tt.valueType) + if tt.wantErr { + assert.Error(t, err) + return + } + + require.NoError(t, err) + assert.NotNil(t, encoded) + + // Decode and verify round-trip + decoded, err := DecodeTagValues(encoded, tt.valueType, len(tt.values)) + require.NoError(t, err) + assert.Equal(t, len(tt.values), len(decoded)) + for i, expected := range tt.values { + assert.Equal(t, expected, decoded[i]) + } + }) + } +} + +func TestTagFilterGeneration(t *testing.T) { + tests := []struct { + name string + values [][]byte + expectedElements int + wantNil bool + }{ + { + name: "empty values", + values: [][]byte{}, + expectedElements: 10, + wantNil: true, + }, + { + name: "single value", + values: [][]byte{ + []byte("value1"), + }, + expectedElements: 10, + wantNil: false, + }, + { + name: "multiple values", + values: [][]byte{ + []byte("value1"), + []byte("value2"), + []byte("value3"), + }, + expectedElements: 10, + wantNil: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + filter := generateTagFilter(tt.values, tt.expectedElements) + if tt.wantNil { + assert.Nil(t, filter) + return + } + + require.NotNil(t, filter) + + // Test that all added values might be contained + for _, value := range tt.values { + assert.True(t, filter.MightContain(value)) + } + + // Test that a non-added value might not be contained + // (this could return true due to false positives, but we test anyway) + nonExistent := []byte("non_existent_value_12345") + // We can't assert false here due to possible false positives + filter.MightContain(nonExistent) + }) + } +} + +func TestTagDataOperations(t *testing.T) { + t.Run("add and check values", func(t *testing.T) { + td := generateTagData() + defer releaseTagData(td) + + td.name = testTagName + td.valueType = pbv1.ValueTypeStr + td.indexed = true + td.filter = generateTagFilter([][]byte{}, 10) // Start with empty filter + + // Add values + values := [][]byte{ + []byte("value1"), + []byte("value2"), + []byte("value3"), + } + + for _, value := range values { + td.addValue(value) + } + + // Check that all values are present + for _, value := range values { + assert.True(t, td.hasValue(value)) + } + + // Check that a non-existent value might not be present + assert.False(t, td.hasValue([]byte("non_existent"))) + }) + + t.Run("update min max for int64", func(t *testing.T) { + td := generateTagData() + defer releaseTagData(td) + + td.name = "int_tag" + td.valueType = pbv1.ValueTypeInt64 + + // Add int64 values (encoded as bytes) + values := []int64{100, -50, 200, 0, 150} + for _, v := range values { + encoded := make([]byte, 8) + binary.LittleEndian.PutUint64(encoded, uint64(v)) + td.values = append(td.values, encoded) + } + + td.updateMinMax() + + // Verify min and max + assert.NotNil(t, td.min) + assert.NotNil(t, td.max) + + // Decode and verify values + minVal := int64(binary.LittleEndian.Uint64(td.min)) + assert.Equal(t, int64(-50), minVal) + + maxVal := int64(binary.LittleEndian.Uint64(td.max)) + assert.Equal(t, int64(200), maxVal) + }) + + t.Run("reset functionality", func(t *testing.T) { + td := generateTagData() + + // Set up tag data + td.name = testTagName + td.valueType = pbv1.ValueTypeStr + td.indexed = true + td.values = [][]byte{[]byte("value1"), []byte("value2")} + td.filter = generateTagFilter(td.values, 10) + td.min = []byte("min") + td.max = []byte("max") + + // Reset + td.reset() + + // Verify reset + assert.Equal(t, "", td.name) + assert.Equal(t, pbv1.ValueTypeUnknown, td.valueType) + assert.False(t, td.indexed) + assert.Equal(t, 0, len(td.values)) + assert.Nil(t, td.filter) + assert.Nil(t, td.min) + assert.Nil(t, td.max) + + releaseTagData(td) + }) +} + +func TestTagMetadataOperations(t *testing.T) { + t.Run("marshal and unmarshal", func(t *testing.T) { + original := generateTagMetadata() + defer releaseTagMetadata(original) + + // Set up metadata + original.name = "test_tag" + original.valueType = pbv1.ValueTypeInt64 + original.indexed = true + original.compressed = false + original.dataBlock = dataBlock{offset: 100, size: 500} + original.filterBlock = dataBlock{offset: 600, size: 200} + original.min = []byte{0x01, 0x02} + original.max = []byte{0xFF, 0xFE} + + // Marshal + data, err := original.marshal() + require.NoError(t, err) + assert.NotNil(t, data) + + // Unmarshal + unmarshaled, err := unmarshalTagMetadata(data) + require.NoError(t, err) + defer releaseTagMetadata(unmarshaled) + + // Verify fields + assert.Equal(t, original.name, unmarshaled.name) + assert.Equal(t, original.valueType, unmarshaled.valueType) + assert.Equal(t, original.indexed, unmarshaled.indexed) + assert.Equal(t, original.compressed, unmarshaled.compressed) + assert.Equal(t, original.dataBlock, unmarshaled.dataBlock) + assert.Equal(t, original.filterBlock, unmarshaled.filterBlock) + assert.Equal(t, original.min, unmarshaled.min) + assert.Equal(t, original.max, unmarshaled.max) + }) + + t.Run("marshal empty metadata", func(t *testing.T) { + tm := generateTagMetadata() + defer releaseTagMetadata(tm) + + tm.name = "empty_tag" + tm.valueType = pbv1.ValueTypeStr + + data, err := tm.marshal() + require.NoError(t, err) + + unmarshaled, err := unmarshalTagMetadata(data) + require.NoError(t, err) + defer releaseTagMetadata(unmarshaled) + + assert.Equal(t, tm.name, unmarshaled.name) + assert.Equal(t, tm.valueType, unmarshaled.valueType) + assert.False(t, unmarshaled.indexed) + assert.False(t, unmarshaled.compressed) + assert.Nil(t, unmarshaled.min) + assert.Nil(t, unmarshaled.max) + }) + + t.Run("reset functionality", func(t *testing.T) { + tm := generateTagMetadata() + + // Set up metadata + tm.name = "test_tag" + tm.valueType = pbv1.ValueTypeInt64 + tm.indexed = true + tm.compressed = true + tm.dataBlock = dataBlock{offset: 100, size: 500} + tm.filterBlock = dataBlock{offset: 600, size: 200} + tm.min = []byte("min") + tm.max = []byte("max") + + // Reset + tm.reset() + + // Verify reset + assert.Equal(t, "", tm.name) + assert.Equal(t, pbv1.ValueTypeUnknown, tm.valueType) + assert.False(t, tm.indexed) + assert.False(t, tm.compressed) + assert.Equal(t, dataBlock{}, tm.dataBlock) + assert.Equal(t, dataBlock{}, tm.filterBlock) + assert.Nil(t, tm.min) + assert.Nil(t, tm.max) + + releaseTagMetadata(tm) + }) +} + +func TestTagPooling(t *testing.T) { + t.Run("tagData pooling", func(t *testing.T) { + // Get from pool + td1 := generateTagData() + assert.NotNil(t, td1) + + // Use and release + td1.name = "test" + releaseTagData(td1) + + // Get again - should be reset + td2 := generateTagData() + assert.NotNil(t, td2) + assert.Equal(t, "", td2.name) // Should be reset + + releaseTagData(td2) + }) + + t.Run("tagMetadata pooling", func(t *testing.T) { + // Get from pool + tm1 := generateTagMetadata() + assert.NotNil(t, tm1) + + // Use and release + tm1.name = "test" + releaseTagMetadata(tm1) + + // Get again - should be reset + tm2 := generateTagMetadata() + assert.NotNil(t, tm2) + assert.Equal(t, "", tm2.name) // Should be reset + + releaseTagMetadata(tm2) + }) + + t.Run("bloomFilter pooling", func(t *testing.T) { + // Generate bloom filter + bf1 := generateBloomFilter(100) + assert.NotNil(t, bf1) + assert.Equal(t, 100, bf1.N()) + + // Add some data + bf1.Add([]byte("test1")) + bf1.Add([]byte("test2")) + assert.True(t, bf1.MightContain([]byte("test1"))) + + // Release to pool + releaseBloomFilter(bf1) + + // Get from pool again + bf2 := generateBloomFilter(50) + assert.NotNil(t, bf2) + assert.Equal(t, 50, bf2.N()) // Should be reset to new size + assert.False(t, bf2.MightContain([]byte("test1"))) // Should not contain old data + + releaseBloomFilter(bf2) + }) +} + +func TestEdgeCases(t *testing.T) { + t.Run("invalid int64 length", func(t *testing.T) { + // The shared encoding module panics on invalid data (fail fast) + assert.Panics(t, func() { + invalidValues := [][]byte{{0x01, 0x02, 0x03}} // Only 3 bytes, not 8 + _, _ = EncodeTagValues(invalidValues, pbv1.ValueTypeInt64) + }) + }) + + t.Run("marshal nil values", func(t *testing.T) { + data, err := EncodeTagValues(nil, pbv1.ValueTypeStr) + require.NoError(t, err) + assert.Nil(t, data) + + values, err := DecodeTagValues(nil, pbv1.ValueTypeStr, 0) + require.NoError(t, err) + assert.Nil(t, values) + }) + + t.Run("updateMinMax with empty values", func(t *testing.T) { + td := generateTagData() + defer releaseTagData(td) + + td.valueType = pbv1.ValueTypeInt64 + td.values = [][]byte{} // empty + + td.updateMinMax() + assert.Nil(t, td.min) + assert.Nil(t, td.max) + }) + + t.Run("updateMinMax with non-int64 type", func(t *testing.T) { + td := generateTagData() + defer releaseTagData(td) + + td.valueType = pbv1.ValueTypeStr + td.values = [][]byte{[]byte("test")} + + td.updateMinMax() + assert.Nil(t, td.min) + assert.Nil(t, td.max) + }) + + t.Run("hasValue with non-indexed tag", func(t *testing.T) { + td := generateTagData() + defer releaseTagData(td) + + td.indexed = false + td.values = [][]byte{[]byte("value1"), []byte("value2")} + + // Should use linear search + assert.True(t, td.hasValue([]byte("value1"))) + assert.True(t, td.hasValue([]byte("value2"))) + assert.False(t, td.hasValue([]byte("value3"))) + }) +} + +func TestRoundTripIntegrity(t *testing.T) { + t.Run("complete round trip", func(t *testing.T) { + // Create original tag metadata + original := generateTagMetadata() + defer releaseTagMetadata(original) + + original.name = "integration_tag" + original.valueType = pbv1.ValueTypeInt64 + original.indexed = true + original.compressed = true + original.dataBlock = dataBlock{offset: 1000, size: 2000} + original.filterBlock = dataBlock{offset: 3000, size: 500} + + // Create some int64 values + int64Values := []int64{-100, 0, 100, 200, 50} + var encodedValues [][]byte + for _, v := range int64Values { + encoded := make([]byte, 8) + binary.LittleEndian.PutUint64(encoded, uint64(v)) + encodedValues = append(encodedValues, encoded) + } + + // Marshal values using shared encoding + marshaledValues, err := EncodeTagValues(encodedValues, pbv1.ValueTypeInt64) + require.NoError(t, err) + + // Marshal metadata + marshaledMetadata, err := original.marshal() + require.NoError(t, err) + + // Unmarshal metadata + unmarshaledMetadata, err := unmarshalTagMetadata(marshaledMetadata) + require.NoError(t, err) + defer releaseTagMetadata(unmarshaledMetadata) + + // Unmarshal values using shared encoding + unmarshaledValues, err := DecodeTagValues(marshaledValues, pbv1.ValueTypeInt64, len(encodedValues)) + require.NoError(t, err) + + // Verify metadata integrity + assert.Equal(t, original.name, unmarshaledMetadata.name) + assert.Equal(t, original.valueType, unmarshaledMetadata.valueType) + assert.Equal(t, original.indexed, unmarshaledMetadata.indexed) + assert.Equal(t, original.compressed, unmarshaledMetadata.compressed) + + // Verify values integrity + assert.Equal(t, len(encodedValues), len(unmarshaledValues)) + for i, original := range encodedValues { + assert.True(t, bytes.Equal(original, unmarshaledValues[i])) + } + + // Decode and verify int64 values + for i, expected := range int64Values { + decoded := int64(binary.LittleEndian.Uint64(unmarshaledValues[i])) + assert.Equal(t, expected, decoded) + } + }) +} + +func TestTagCompression(t *testing.T) { + t.Run("compress and decompress tag data", func(t *testing.T) { + originalData := []byte("this is some test data that should be compressed and decompressed properly") + + // Compress + compressed := compressTagData(originalData) + assert.NotNil(t, compressed) + assert.NotEqual(t, originalData, compressed) + + // Decompress + decompressed, err := decompressTagData(compressed) + require.NoError(t, err) + assert.Equal(t, originalData, decompressed) + }) + + t.Run("compress empty data", func(t *testing.T) { + compressed := compressTagData(nil) + assert.Nil(t, compressed) + + compressed = compressTagData([]byte{}) + assert.Nil(t, compressed) + }) + + t.Run("decompress empty data", func(t *testing.T) { + decompressed, err := decompressTagData(nil) + require.NoError(t, err) + assert.Nil(t, decompressed) + + decompressed, err = decompressTagData([]byte{}) + require.NoError(t, err) + assert.Nil(t, decompressed) + }) +} + +func TestTagValuesCompression(t *testing.T) { + t.Run("encode and decode compressed values", func(t *testing.T) { + values := [][]byte{ + []byte("this is a longer string that should compress well"), + []byte("another long string with repeated words repeated words"), + []byte("yet another string for compression testing purposes"), + } + + // Encode with automatic compression for string data + compressed, err := EncodeTagValues(values, pbv1.ValueTypeStr) + require.NoError(t, err) + assert.NotNil(t, compressed) + + // Decode compressed data + decompressed, err := DecodeTagValues(compressed, pbv1.ValueTypeStr, len(values)) + require.NoError(t, err) + assert.Equal(t, len(values), len(decompressed)) + + for i, expected := range values { + assert.Equal(t, expected, decompressed[i]) + } + }) + + t.Run("compression works for repetitive data", func(t *testing.T) { + // Create repetitive data that should compress well + repetitiveData := make([][]byte, 100) + for i := range repetitiveData { + repetitiveData[i] = []byte("repeated_data_pattern_that_compresses_well") + } + + // Encode with automatic compression + compressed, err := EncodeTagValues(repetitiveData, pbv1.ValueTypeStr) + require.NoError(t, err) + + // Verify decompression works + decompressed, err := DecodeTagValues(compressed, pbv1.ValueTypeStr, len(repetitiveData)) + require.NoError(t, err) + assert.Equal(t, repetitiveData, decompressed) + }) +} + +func TestCompressionRoundTrip(t *testing.T) { + testCases := []struct { + name string + values [][]byte + }{ + { + name: "small values", + values: [][]byte{[]byte("a"), []byte("b"), []byte("c")}, + }, + { + name: "large values", + values: [][]byte{ + []byte("this is a very long string that contains a lot of data and should compress well when using zstd compression algorithm"), + []byte("another long string with different content but still should benefit from compression due to common patterns"), + }, + }, + { + name: "mixed size values", + values: [][]byte{ + []byte("short"), + []byte("this is a medium length string"), + []byte("this is a very very very long string that goes on and on with lots of repeated words and patterns that compression algorithms love to work with"), + []byte("x"), + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Test round trip: values -> encoded -> decoded + encoded, err := EncodeTagValues(tc.values, pbv1.ValueTypeStr) + require.NoError(t, err) + + decoded, err := DecodeTagValues(encoded, pbv1.ValueTypeStr, len(tc.values)) + require.NoError(t, err) + + assert.Equal(t, tc.values, decoded) + }) + } +} + +func TestBloomFilterEncoding(t *testing.T) { + t.Run("encode and decode bloom filter", func(t *testing.T) { + // Create a bloom filter and add some data + bf := generateBloomFilter(100) + defer releaseBloomFilter(bf) + + testValues := [][]byte{ + []byte("value1"), + []byte("value2"), + []byte("value3"), + } + + for _, value := range testValues { + bf.Add(value) + } + + // Encode the bloom filter + var encoded []byte + encoded = encodeBloomFilter(encoded, bf) + assert.NotEmpty(t, encoded) + + // Decode the bloom filter + decodedBf, err := decodeBloomFilter(encoded) + require.NoError(t, err) + defer releaseBloomFilter(decodedBf) + + // Verify the decoded filter contains the same data + assert.Equal(t, bf.N(), decodedBf.N()) + for _, value := range testValues { + assert.True(t, decodedBf.MightContain(value)) + } + }) + + t.Run("encode empty bloom filter", func(t *testing.T) { + var encoded []byte + encoded = encodeBloomFilter(encoded, nil) + assert.Empty(t, encoded) + }) + + t.Run("decode invalid data", func(t *testing.T) { + // Test with data too short + invalidData := []byte{0x01, 0x02} + _, err := decodeBloomFilter(invalidData) + assert.Error(t, err) + assert.Contains(t, err.Error(), "too short") + + // Test with empty data + _, err = decodeBloomFilter([]byte{}) + assert.Error(t, err) + assert.Contains(t, err.Error(), "too short") + }) +} diff --git a/banyand/stream/tag.go b/banyand/stream/tag.go index 4ad42dd4..f1751313 100644 --- a/banyand/stream/tag.go +++ b/banyand/stream/tag.go @@ -18,70 +18,12 @@ package stream import ( + "github.com/apache/skywalking-banyandb/banyand/internal/encoding" "github.com/apache/skywalking-banyandb/pkg/bytes" - "github.com/apache/skywalking-banyandb/pkg/convert" - "github.com/apache/skywalking-banyandb/pkg/encoding" + pkgencoding "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" - "github.com/apache/skywalking-banyandb/pkg/pool" -) - -func generateInt64Slice(length int) *[]int64 { - v := int64SlicePool.Get() - if v == nil { - s := make([]int64, length) - return &s - } - if cap(*v) < length { - *v = make([]int64, length) - } else { - *v = (*v)[:length] - } - return v -} - -func releaseInt64Slice(int64Slice *[]int64) { - *int64Slice = (*int64Slice)[:0] - int64SlicePool.Put(int64Slice) -} - -func generateFloat64Slice(length int) *[]float64 { - v := float64SlicePool.Get() - if v == nil { - s := make([]float64, length) - return &s - } - if cap(*v) < length { - *v = make([]float64, length) - } else { - *v = (*v)[:length] - } - return v -} - -func releaseFloat64Slice(float64Slice *[]float64) { - *float64Slice = (*float64Slice)[:0] - float64SlicePool.Put(float64Slice) -} - -func generateDictionary() *encoding.Dictionary { - v := dictionaryPool.Get() - if v == nil { - return &encoding.Dictionary{} - } - return v -} - -func releaseDictionary(d *encoding.Dictionary) { - d.Reset() - dictionaryPool.Put(d) -} - -var ( - int64SlicePool = pool.Register[*[]int64]("stream-int64Slice") - float64SlicePool = pool.Register[*[]float64]("stream-float64Slice") - dictionaryPool = pool.Register[*encoding.Dictionary]("stream-dictionary") ) type tag struct { @@ -119,27 +61,22 @@ func (t *tag) mustWriteTo(tm *tagMetadata, tagWriter *writer, tagFilterWriter *w tm.name = t.name tm.valueType = t.valueType - bb := bigValuePool.Generate() - defer bigValuePool.Release(bb) - - // select encoding based on data type - switch t.valueType { - case pbv1.ValueTypeInt64: - t.encodeInt64Tag(bb) - case pbv1.ValueTypeFloat64: - t.encodeFloat64Tag(bb) - default: - t.encodeDefault(bb) + // Use shared encoding module + encodedData, err := encoding.EncodeTagValues(t.values, t.valueType) + if err != nil { + logger.Panicf("failed to encode tag values: %v", err) } - tm.size = uint64(len(bb.Buf)) + + tm.size = uint64(len(encodedData)) if tm.size > maxValuesBlockSize { logger.Panicf("too large valuesSize: %d bytes; mustn't exceed %d bytes", tm.size, maxValuesBlockSize) } tm.offset = tagWriter.bytesWritten - tagWriter.MustWrite(bb.Buf) + tagWriter.MustWrite(encodedData) if t.filter != nil { - bb.Reset() + bb := bigValuePool.Generate() + defer bigValuePool.Release(bb) bb.Buf = encodeBloomFilter(bb.Buf[:0], t.filter) if tm.valueType == pbv1.ValueTypeInt64 { tm.min = t.min @@ -151,125 +88,33 @@ func (t *tag) mustWriteTo(tm *tagMetadata, tagWriter *writer, tagFilterWriter *w } } -func (t *tag) encodeInt64Tag(bb *bytes.Buffer) { - // convert byte array to int64 array - intValuesPtr := generateInt64Slice(len(t.values)) - intValues := *intValuesPtr - defer releaseInt64Slice(intValuesPtr) - var encodeType encoding.EncodeType - - for i, v := range t.values { - if v == nil || string(v) == "null" { - t.encodeDefault(bb) - encodeType = encoding.EncodeTypePlain - // Prepend encodeType (1 byte) to the beginning - bb.Buf = append([]byte{byte(encodeType)}, bb.Buf...) - return - } - if len(v) != 8 { - logger.Panicf("invalid value length at index %d: expected 8 bytes, got %d", i, len(v)) - } - intValues[i] = convert.BytesToInt64(v) - } - // use delta encoding for integer column - var firstValue int64 - bb.Buf, encodeType, firstValue = encoding.Int64ListToBytes(bb.Buf[:0], intValues) - if encodeType == encoding.EncodeTypeUnknown { - logger.Panicf("invalid encode type for int64 values") - } - firstValueBytes := convert.Int64ToBytes(firstValue) - // Prepend encodeType (1 byte) and firstValue (8 bytes) to the beginning - bb.Buf = append( - append([]byte{byte(encodeType)}, firstValueBytes...), - bb.Buf..., - ) -} - -func (t *tag) encodeFloat64Tag(bb *bytes.Buffer) { - // convert byte array to float64 array - intValuesPtr := generateInt64Slice(len(t.values)) - intValues := *intValuesPtr - defer releaseInt64Slice(intValuesPtr) - - floatValuesPtr := generateFloat64Slice(len(t.values)) - floatValues := *floatValuesPtr - defer releaseFloat64Slice(floatValuesPtr) - - var encodeType encoding.EncodeType - - doEncodeDefault := func() { - t.encodeDefault(bb) - encodeType = encoding.EncodeTypePlain - // Prepend encodeType (1 byte) to the beginning - bb.Buf = append([]byte{byte(encodeType)}, bb.Buf...) - } - - for i, v := range t.values { - if v == nil || string(v) == "null" { - doEncodeDefault() - return - } - if len(v) != 8 { - logger.Panicf("invalid value length at index %d: expected 8 bytes, got %d", i, len(v)) - } - floatValues[i] = convert.BytesToFloat64(v) - } - intValues, exp, err := encoding.Float64ListToDecimalIntList(intValues[:0], floatValues) - if err != nil { - logger.Errorf("cannot convert Float64List to DecimalIntList : %v", err) - doEncodeDefault() - return - } - var firstValue int64 - bb.Buf, encodeType, firstValue = encoding.Int64ListToBytes(bb.Buf[:0], intValues) - if encodeType == encoding.EncodeTypeUnknown { - logger.Panicf("invalid encode type for int64 values") - } - firstValueBytes := convert.Int64ToBytes(firstValue) - expBytes := convert.Int16ToBytes(exp) - // Prepend encodeType (1 byte), exp (2 bytes) and firstValue (8 bytes) to the beginning - bb.Buf = append( - append(append([]byte{byte(encodeType)}, expBytes...), firstValueBytes...), - bb.Buf..., - ) -} - -func (t *tag) encodeDefault(bb *bytes.Buffer) { - dict := generateDictionary() - defer releaseDictionary(dict) - for _, v := range t.values { - if !dict.Add(v) { - bb.Buf = encoding.EncodeBytesBlock(bb.Buf[:0], t.values) - bb.Buf = append([]byte{byte(encoding.EncodeTypePlain)}, bb.Buf...) - return - } - } - bb.Buf = dict.Encode(bb.Buf[:0]) - bb.Buf = append([]byte{byte(encoding.EncodeTypeDictionary)}, bb.Buf...) -} - -func (t *tag) mustReadValues(decoder *encoding.BytesBlockDecoder, reader fs.Reader, cm tagMetadata, count uint64) { +func (t *tag) mustReadValues(_ *pkgencoding.BytesBlockDecoder, reader fs.Reader, cm tagMetadata, count uint64) { t.name = cm.name t.valueType = cm.valueType if t.valueType == pbv1.ValueTypeUnknown { - for i := uint64(0); i < count; i++ { + for range count { t.values = append(t.values, nil) } return } - bb := bigValuePool.Generate() - defer bigValuePool.Release(bb) valuesSize := cm.size if valuesSize > maxValuesBlockSize { logger.Panicf("%s: block size cannot exceed %d bytes; got %d bytes", reader.Path(), maxValuesBlockSize, valuesSize) } - bb.Buf = bytes.ResizeOver(bb.Buf, int(valuesSize)) - fs.MustReadData(reader, int64(cm.offset), bb.Buf) - t.decodeTagValues(decoder, reader.Path(), count, bb) + + data := make([]byte, valuesSize) + fs.MustReadData(reader, int64(cm.offset), data) + + // Use shared decoding module + decodedValues, err := encoding.DecodeTagValues(data, t.valueType, int(count)) + if err != nil { + logger.Panicf("%s: failed to decode tag values: %v", reader.Path(), err) + } + t.values = decodedValues } -func (t *tag) mustSeqReadValues(decoder *encoding.BytesBlockDecoder, reader *seqReader, cm tagMetadata, count uint64) { +func (t *tag) mustSeqReadValues(_ *pkgencoding.BytesBlockDecoder, reader *seqReader, cm tagMetadata, count uint64) { t.name = cm.name t.valueType = cm.valueType if cm.offset != reader.bytesRead { @@ -280,117 +125,15 @@ func (t *tag) mustSeqReadValues(decoder *encoding.BytesBlockDecoder, reader *seq logger.Panicf("%s: block size cannot exceed %d bytes; got %d bytes", reader.Path(), maxValuesBlockSize, valuesSize) } - bb := bigValuePool.Generate() - defer bigValuePool.Release(bb) + data := make([]byte, valuesSize) + reader.mustReadFull(data) - bb.Buf = bytes.ResizeOver(bb.Buf, int(valuesSize)) - reader.mustReadFull(bb.Buf) - t.decodeTagValues(decoder, reader.Path(), count, bb) -} - -func (t *tag) decodeTagValues(decoder *encoding.BytesBlockDecoder, path string, count uint64, bb *bytes.Buffer) { - switch t.valueType { - case pbv1.ValueTypeInt64: - t.decodeInt64Tag(decoder, path, count, bb) - case pbv1.ValueTypeFloat64: - t.decodeFloat64Tag(decoder, path, count, bb) - default: - t.decodeDefault(decoder, bb, count, path) - } -} - -func (t *tag) decodeInt64Tag(decoder *encoding.BytesBlockDecoder, path string, count uint64, bb *bytes.Buffer) { - // decode integer type - intValuesPtr := generateInt64Slice(int(count)) - intValues := *intValuesPtr - defer releaseInt64Slice(intValuesPtr) - - if len(bb.Buf) < 1 { - logger.Panicf("bb.Buf length too short: expect at least %d bytes, but got %d bytes", 1, len(bb.Buf)) - } - encodeType := encoding.EncodeType(bb.Buf[0]) - if encodeType == encoding.EncodeTypePlain { - bb.Buf = bb.Buf[1:] - t.decodeDefault(decoder, bb, count, path) - return - } - - const expectedLen = 9 - if len(bb.Buf) < expectedLen { - logger.Panicf("bb.Buf length too short: expect at least %d bytes, but got %d bytes", expectedLen, len(bb.Buf)) - } - firstValue := convert.BytesToInt64(bb.Buf[1:9]) - bb.Buf = bb.Buf[9:] - var err error - intValues, err = encoding.BytesToInt64List(intValues[:0], bb.Buf, encodeType, firstValue, int(count)) - if err != nil { - logger.Panicf("%s: cannot decode int values: %v", path, err) - } - // convert int64 array to byte array - t.values = make([][]byte, count) - for i, v := range intValues { - t.values[i] = convert.Int64ToBytes(v) - } -} - -func (t *tag) decodeFloat64Tag(decoder *encoding.BytesBlockDecoder, path string, count uint64, bb *bytes.Buffer) { - // decode float type - intValuesPtr := generateInt64Slice(int(count)) - intValues := *intValuesPtr - defer releaseInt64Slice(intValuesPtr) - floatValuesPtr := generateFloat64Slice(int(count)) - floatValues := *floatValuesPtr - defer releaseFloat64Slice(floatValuesPtr) - - if len(bb.Buf) < 1 { - logger.Panicf("bb.Buf length too short: expect at least %d bytes, but got %d bytes", 1, len(bb.Buf)) - } - encodeType := encoding.EncodeType(bb.Buf[0]) - if encodeType == encoding.EncodeTypePlain { - bb.Buf = bb.Buf[1:] - t.decodeDefault(decoder, bb, count, path) - return - } - - const expectedLen = 11 - if len(bb.Buf) < expectedLen { - logger.Panicf("bb.Buf length too short: expect at least %d bytes, but got %d bytes", expectedLen, len(bb.Buf)) - } - exp := convert.BytesToInt16(bb.Buf[1:3]) - firstValue := convert.BytesToInt64(bb.Buf[3:11]) - bb.Buf = bb.Buf[11:] - var err error - intValues, err = encoding.BytesToInt64List(intValues[:0], bb.Buf, encodeType, firstValue, int(count)) - if err != nil { - logger.Panicf("%s: cannot decode int values: %v", path, err) - } - floatValues, err = encoding.DecimalIntListToFloat64List(floatValues[:0], intValues, exp, int(count)) - if err != nil { - logger.Panicf("cannot convert DecimalIntList to Float64List: %v", err) - } - if uint64(len(floatValues)) != count { - logger.Panicf("unexpected floatValues length: got %d, expected %d", len(floatValues), count) - } - // convert float64 array to byte array - t.values = make([][]byte, count) - for i, v := range floatValues { - t.values[i] = convert.Float64ToBytes(v) - } -} - -func (t *tag) decodeDefault(decoder *encoding.BytesBlockDecoder, bb *bytes.Buffer, count uint64, path string) { - encodeType := encoding.EncodeType(bb.Buf[0]) - var err error - if encodeType == encoding.EncodeTypeDictionary { - dict := generateDictionary() - defer releaseDictionary(dict) - t.values, err = dict.Decode(t.values[:0], bb.Buf[1:], count) - } else { - t.values, err = decoder.Decode(t.values[:0], bb.Buf[1:], count) - } + // Use shared decoding module + decodedValues, err := encoding.DecodeTagValues(data, t.valueType, int(count)) if err != nil { - logger.Panicf("%s: cannot decode values: %v", path, err) + logger.Panicf("%s: failed to decode tag values: %v", reader.Path(), err) } + t.values = decodedValues } var bigValuePool = bytes.NewBufferPool("stream-big-value") diff --git a/banyand/stream/tag_test.go b/banyand/stream/tag_test.go index 1416bbd5..acb0e08f 100644 --- a/banyand/stream/tag_test.go +++ b/banyand/stream/tag_test.go @@ -240,12 +240,9 @@ func TestTag_HighCardinalityStringEncoding(t *testing.T) { assert.Equal(t, testTag.name, tm.name) assert.Equal(t, testTag.valueType, tm.valueType) - // Check encoding type by examining the first byte of the encoded data + // The new shared encoding uses automatic compression, so we can't directly check the encoding type + // from the first byte anymore. Instead, we verify the round-trip works correctly. assert.True(t, len(buf.Buf) > 0, "Encoded buffer should not be empty") - actualEncType := encoding.EncodeType(buf.Buf[0]) - assert.Equal(t, tt.expectedEncType, actualEncType, - "Expected %s encoding (%d), got %d. %s", - getEncodeTypeName(tt.expectedEncType), tt.expectedEncType, actualEncType, tt.description) // Test roundtrip: decode and verify all values are preserved decoder := &encoding.BytesBlockDecoder{} @@ -264,15 +261,3 @@ func TestTag_HighCardinalityStringEncoding(t *testing.T) { }) } } - -// Helper function to get encode type name for better test output. -func getEncodeTypeName(encType encoding.EncodeType) string { - switch encType { - case encoding.EncodeTypePlain: - return "Plain" - case encoding.EncodeTypeDictionary: - return "Dictionary" - default: - return fmt.Sprintf("Unknown(%d)", encType) - } -}