This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push: new af494b13 Add integration test for trace (#741) af494b13 is described below commit af494b13bd2c406894d1a23735ad3e0388cb9779 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Thu Aug 28 23:09:21 2025 +0800 Add integration test for trace (#741) --- .golangci.yml | 2 + AI_CODING_GUIDELINES.md | 1 + banyand/internal/encoding/tag_encoder.go | 142 +++----- banyand/internal/encoding/tag_encoder_test.go | 42 ++- banyand/internal/sidx/block.go | 7 +- banyand/internal/sidx/part.go | 7 +- banyand/internal/sidx/query_result.go | 14 +- banyand/internal/sidx/tag.go | 9 - banyand/liaison/grpc/discovery.go | 90 +++-- banyand/liaison/grpc/server.go | 14 +- banyand/liaison/grpc/trace.go | 370 +++++++++++++++++++++ banyand/liaison/http/server.go | 2 + banyand/stream/tag.go | 37 ++- banyand/trace/block.go | 4 + banyand/trace/introducer.go | 8 +- banyand/trace/metadata.go | 149 ++++++++- banyand/trace/part.go | 4 + banyand/trace/query.go | 30 +- banyand/trace/svc_standalone.go | 99 ++---- banyand/trace/tag.go | 306 ++--------------- banyand/trace/tag_test.go | 347 ++++++++----------- banyand/trace/timestamp_test.go | 123 +++++++ banyand/trace/trace.go | 11 +- banyand/trace/trace_suite_test.go | 4 +- banyand/trace/traces.go | 2 +- banyand/trace/write_standalone.go | 10 +- pkg/cmdsetup/standalone.go | 6 + pkg/pb/v1/value.go | 42 +++ pkg/pb/v1/value_test.go | 9 + pkg/schema/init.go | 39 ++- pkg/test/setup/setup.go | 9 + pkg/test/trace/etcd.go | 109 ++++++ .../trace/testdata/groups/test-trace-group.json | 19 ++ .../trace/testdata/index_rule_bindings/sw.json | 17 + pkg/test/trace/testdata/index_rules/duration.json | 14 + pkg/test/trace/testdata/index_rules/timestamp.json | 14 + pkg/test/trace/testdata/traces/sw.json | 39 +++ test/cases/init.go | 4 + test/cases/trace/data/data.go | 206 ++++++++++++ test/cases/trace/data/input/all.yml | 20 ++ test/cases/trace/data/testdata/sw.json | 177 ++++++++++ test/cases/trace/data/want/all.yml | 48 +++ test/cases/trace/trace.go | 46 +++ .../standalone/query/query_suite_test.go | 4 + 44 files changed, 1900 insertions(+), 756 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 6fad3c15..be8bc5de 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -69,6 +69,8 @@ linters-settings: alias: streamv1 - pkg: github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1 alias: clusterv1 + - pkg: github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1 + alias: tracev1 - pkg: github.com/apache/skywalking-banyandb/pkg/pb/v1 alias: pbv1 lll: diff --git a/AI_CODING_GUIDELINES.md b/AI_CODING_GUIDELINES.md index 2f5b9c62..c02cdce8 100644 --- a/AI_CODING_GUIDELINES.md +++ b/AI_CODING_GUIDELINES.md @@ -49,6 +49,7 @@ Use these specific aliases for protobuf packages: - github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1 → measurev1 - github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1 → streamv1 - github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1 → clusterv1 +- github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1 → tracev1 - github.com/apache/skywalking-banyandb/pkg/pb/v1 → pbv1 ## ERROR HANDLING PATTERNS diff --git a/banyand/internal/encoding/tag_encoder.go b/banyand/internal/encoding/tag_encoder.go index 7859dd1d..78bcf0bf 100644 --- a/banyand/internal/encoding/tag_encoder.go +++ b/banyand/internal/encoding/tag_encoder.go @@ -22,7 +22,6 @@ package encoding import ( "github.com/apache/skywalking-banyandb/pkg/bytes" - "github.com/apache/skywalking-banyandb/pkg/compress/zstd" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/logger" @@ -30,15 +29,10 @@ import ( "github.com/apache/skywalking-banyandb/pkg/pool" ) -const ( - defaultCompressionLevel = 3 -) - var ( int64SlicePool = pool.Register[*[]int64]("tag-encoder-int64Slice") float64SlicePool = pool.Register[*[]float64]("tag-encoder-float64Slice") dictionaryPool = pool.Register[*encoding.Dictionary]("tag-encoder-dictionary") - bigValuePool = bytes.NewBufferPool("tag-encoder-big-value") ) func generateInt64Slice(length int) *[]int64 { @@ -96,14 +90,11 @@ func releaseDictionary(d *encoding.Dictionary) { // For int64: uses delta encoding with first value storage. // For float64: converts to decimal integers with exponent, then delta encoding. // For other types: uses dictionary encoding, falls back to plain with zstd compression. -func EncodeTagValues(values [][]byte, valueType pbv1.ValueType) ([]byte, error) { +func EncodeTagValues(bb *bytes.Buffer, values [][]byte, valueType pbv1.ValueType) error { if len(values) == 0 { - return nil, nil + return nil } - bb := bigValuePool.Generate() - defer bigValuePool.Release(bb) - switch valueType { case pbv1.ValueTypeInt64: return encodeInt64TagValues(bb, values) @@ -115,29 +106,22 @@ func EncodeTagValues(values [][]byte, valueType pbv1.ValueType) ([]byte, error) } // DecodeTagValues decodes tag values based on the value type. -func DecodeTagValues(data []byte, valueType pbv1.ValueType, count int) ([][]byte, error) { - if len(data) == 0 { +func DecodeTagValues(dst [][]byte, decoder *encoding.BytesBlockDecoder, bb *bytes.Buffer, valueType pbv1.ValueType, count int) ([][]byte, error) { + if len(bb.Buf) == 0 { return nil, nil } - bb := bigValuePool.Generate() - defer bigValuePool.Release(bb) - bb.Buf = append(bb.Buf[:0], data...) - - decoder := &encoding.BytesBlockDecoder{} - values := make([][]byte, 0, count) - switch valueType { case pbv1.ValueTypeInt64: - return decodeInt64TagValues(decoder, bb, uint64(count)) + return decodeInt64TagValues(dst, decoder, bb, uint64(count)) case pbv1.ValueTypeFloat64: - return decodeFloat64TagValues(decoder, bb, uint64(count)) + return decodeFloat64TagValues(dst, decoder, bb, uint64(count)) default: - return decodeDefaultTagValues(decoder, bb, uint64(count), values) + return decodeDefaultTagValues(dst, decoder, bb, uint64(count)) } } -func encodeInt64TagValues(bb *bytes.Buffer, values [][]byte) ([]byte, error) { +func encodeInt64TagValues(bb *bytes.Buffer, values [][]byte) error { intValuesPtr := generateInt64Slice(len(values)) intValues := *intValuesPtr defer releaseInt64Slice(intValuesPtr) @@ -147,11 +131,9 @@ func encodeInt64TagValues(bb *bytes.Buffer, values [][]byte) ([]byte, error) { if v == nil || string(v) == "null" { // Handle null values by falling back to default encoding bb.Buf = encoding.EncodeBytesBlock(bb.Buf[:0], values) - // Apply zstd compression for plain encoding - compressed := zstd.Compress(nil, bb.Buf, defaultCompressionLevel) // Prepend EncodeTypePlain at the head of compressed data - result := append([]byte{byte(encoding.EncodeTypePlain)}, compressed...) - return result, nil + bb.Buf = append([]byte{byte(encoding.EncodeTypePlain)}, bb.Buf...) + return nil } if len(v) != 8 { logger.Panicf("invalid value length at index %d: expected 8 bytes, got %d", i, len(v)) @@ -167,14 +149,14 @@ func encodeInt64TagValues(bb *bytes.Buffer, values [][]byte) ([]byte, error) { firstValueBytes := convert.Int64ToBytes(firstValue) // Prepend encodeType (1 byte) and firstValue (8 bytes) to the beginning - result := append( + bb.Buf = append( append([]byte{byte(encodeType)}, firstValueBytes...), bb.Buf..., ) - return result, nil + return nil } -func encodeFloat64TagValues(bb *bytes.Buffer, values [][]byte) ([]byte, error) { +func encodeFloat64TagValues(bb *bytes.Buffer, values [][]byte) error { intValuesPtr := generateInt64Slice(len(values)) intValues := *intValuesPtr defer releaseInt64Slice(intValuesPtr) @@ -189,11 +171,9 @@ func encodeFloat64TagValues(bb *bytes.Buffer, values [][]byte) ([]byte, error) { if v == nil || string(v) == "null" { // Handle null values by falling back to default encoding bb.Buf = encoding.EncodeBytesBlock(bb.Buf[:0], values) - // Apply zstd compression for plain encoding - compressed := zstd.Compress(nil, bb.Buf, defaultCompressionLevel) // Prepend EncodeTypePlain at the head of compressed data - result := append([]byte{byte(encoding.EncodeTypePlain)}, compressed...) - return result, nil + bb.Buf = append([]byte{byte(encoding.EncodeTypePlain)}, bb.Buf...) + return nil } if len(v) != 8 { logger.Panicf("invalid value length at index %d: expected 8 bytes, got %d", i, len(v)) @@ -204,13 +184,10 @@ func encodeFloat64TagValues(bb *bytes.Buffer, values [][]byte) ([]byte, error) { intValues, exp, err := encoding.Float64ListToDecimalIntList(intValues[:0], floatValues) if err != nil { logger.Errorf("cannot convert Float64List to DecimalIntList: %v", err) - // Handle error by falling back to default encoding bb.Buf = encoding.EncodeBytesBlock(bb.Buf[:0], values) - // Apply zstd compression for plain encoding - compressed := zstd.Compress(nil, bb.Buf, defaultCompressionLevel) // Prepend EncodeTypePlain at the head of compressed data - result := append([]byte{byte(encoding.EncodeTypePlain)}, compressed...) - return result, nil + bb.Buf = append([]byte{byte(encoding.EncodeTypePlain)}, bb.Buf...) + return nil } var firstValue int64 @@ -222,14 +199,14 @@ func encodeFloat64TagValues(bb *bytes.Buffer, values [][]byte) ([]byte, error) { expBytes := convert.Int16ToBytes(exp) // Prepend encodeType (1 byte), exp (2 bytes) and firstValue (8 bytes) to the beginning - result := append( + bb.Buf = append( append(append([]byte{byte(encodeType)}, expBytes...), firstValueBytes...), bb.Buf..., ) - return result, nil + return nil } -func encodeDefaultTagValues(bb *bytes.Buffer, values [][]byte) ([]byte, error) { +func encodeDefaultTagValues(bb *bytes.Buffer, values [][]byte) error { dict := generateDictionary() defer releaseDictionary(dict) @@ -238,19 +215,17 @@ func encodeDefaultTagValues(bb *bytes.Buffer, values [][]byte) ([]byte, error) { // Dictionary encoding failed, use plain encoding with zstd compression bb.Buf = encoding.EncodeBytesBlock(bb.Buf[:0], values) bb.Buf = append([]byte{byte(encoding.EncodeTypePlain)}, bb.Buf...) - // Apply zstd compression for plain encoding - compressed := zstd.Compress(nil, bb.Buf, defaultCompressionLevel) - return compressed, nil + return nil } } // Dictionary encoding succeeded bb.Buf = dict.Encode(bb.Buf[:0]) bb.Buf = append([]byte{byte(encoding.EncodeTypeDictionary)}, bb.Buf...) - return append([]byte(nil), bb.Buf...), nil + return nil } -func decodeInt64TagValues(decoder *encoding.BytesBlockDecoder, bb *bytes.Buffer, count uint64) ([][]byte, error) { +func decodeInt64TagValues(dst [][]byte, decoder *encoding.BytesBlockDecoder, bb *bytes.Buffer, count uint64) ([][]byte, error) { intValuesPtr := generateInt64Slice(int(count)) intValues := *intValuesPtr defer releaseInt64Slice(intValuesPtr) @@ -263,21 +238,13 @@ func decodeInt64TagValues(decoder *encoding.BytesBlockDecoder, bb *bytes.Buffer, firstByte := encoding.EncodeType(bb.Buf[0]) if firstByte == encoding.EncodeTypePlain { - // This is compressed data with EncodeTypePlain at the head - // Skip the EncodeTypePlain byte and decompress the rest - compressedData := bb.Buf[1:] - decompressed, err := zstd.Decompress(nil, compressedData) - if err != nil { - logger.Panicf("cannot decompress data: %v", err) - } - // Decode the decompressed data - values := make([][]byte, 0, count) - values, decodeErr := decoder.Decode(values[:0], decompressed, count) + var decodeErr error + dst, decodeErr = decoder.Decode(dst[:0], bb.Buf[1:], count) if decodeErr != nil { logger.Panicf("cannot decode values: %v", decodeErr) } - return values, nil + return dst, nil } // Otherwise, this is int list data with EncodeType at the beginning @@ -296,14 +263,17 @@ func decodeInt64TagValues(decoder *encoding.BytesBlockDecoder, bb *bytes.Buffer, } // Convert int64 array to byte array - values := make([][]byte, count) + if len(dst) < len(intValues) { + dst = append(dst, make([][]byte, len(intValues)-len(dst))...) + } + dst = dst[:len(intValues)] for i, v := range intValues { - values[i] = convert.Int64ToBytes(v) + dst[i] = convert.Int64ToBytes(v) } - return values, nil + return dst, nil } -func decodeFloat64TagValues(decoder *encoding.BytesBlockDecoder, bb *bytes.Buffer, count uint64) ([][]byte, error) { +func decodeFloat64TagValues(dst [][]byte, decoder *encoding.BytesBlockDecoder, bb *bytes.Buffer, count uint64) ([][]byte, error) { intValuesPtr := generateInt64Slice(int(count)) intValues := *intValuesPtr defer releaseInt64Slice(intValuesPtr) @@ -320,21 +290,12 @@ func decodeFloat64TagValues(decoder *encoding.BytesBlockDecoder, bb *bytes.Buffe firstByte := encoding.EncodeType(bb.Buf[0]) if firstByte == encoding.EncodeTypePlain { - // This is compressed data with EncodeTypePlain at the head - // Skip the EncodeTypePlain byte and decompress the rest - compressedData := bb.Buf[1:] - decompressed, err := zstd.Decompress(nil, compressedData) - if err != nil { - logger.Panicf("cannot decompress data: %v", err) - } - - // Decode the decompressed data - values := make([][]byte, 0, count) - values, decodeErr := decoder.Decode(values[:0], decompressed, count) + var decodeErr error + dst, decodeErr = decoder.Decode(dst[:0], bb.Buf[1:], count) if decodeErr != nil { logger.Panicf("cannot decode values: %v", decodeErr) } - return values, nil + return dst, nil } // Otherwise, this is float64 int list data with EncodeType at the beginning @@ -363,26 +324,19 @@ func decodeFloat64TagValues(decoder *encoding.BytesBlockDecoder, bb *bytes.Buffe } // Convert float64 array to byte array - values := make([][]byte, count) + if len(dst) < len(floatValues) { + dst = append(dst, make([][]byte, len(floatValues)-len(dst))...) + } + dst = dst[:len(floatValues)] for i, v := range floatValues { - values[i] = convert.Float64ToBytes(v) + dst[i] = convert.Float64ToBytes(v) } - return values, nil + return dst, nil } -func decodeDefaultTagValues(decoder *encoding.BytesBlockDecoder, bb *bytes.Buffer, count uint64, values [][]byte) ([][]byte, error) { +func decodeDefaultTagValues(dst [][]byte, decoder *encoding.BytesBlockDecoder, bb *bytes.Buffer, count uint64) ([][]byte, error) { if len(bb.Buf) < 1 { - return values, nil - } - - // Check if this is zstd compressed data (no encode type prefix) - decompressed, decompErr := zstd.Decompress(nil, bb.Buf) - if decompErr == nil { - // Successfully decompressed, this is compressed data - bb.Buf = decompressed - if len(bb.Buf) < 1 { - return values, nil - } + return dst, nil } encodeType := encoding.EncodeType(bb.Buf[0]) @@ -392,15 +346,15 @@ func decodeDefaultTagValues(decoder *encoding.BytesBlockDecoder, bb *bytes.Buffe case encoding.EncodeTypeDictionary: dict := generateDictionary() defer releaseDictionary(dict) - values, err = dict.Decode(values[:0], bb.Buf[1:], count) + dst, err = dict.Decode(dst[:0], bb.Buf[1:], count) case encoding.EncodeTypePlain: - values, err = decoder.Decode(values[:0], bb.Buf[1:], count) + dst, err = decoder.Decode(dst[:0], bb.Buf[1:], count) default: - values, err = decoder.Decode(values[:0], bb.Buf[1:], count) + dst, err = decoder.Decode(dst[:0], bb.Buf[1:], count) } if err != nil { logger.Panicf("cannot decode values: %v", err) } - return values, nil + return dst, nil } diff --git a/banyand/internal/encoding/tag_encoder_test.go b/banyand/internal/encoding/tag_encoder_test.go index a0b1a54d..52e736ed 100644 --- a/banyand/internal/encoding/tag_encoder_test.go +++ b/banyand/internal/encoding/tag_encoder_test.go @@ -23,7 +23,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/apache/skywalking-banyandb/pkg/bytes" "github.com/apache/skywalking-banyandb/pkg/convert" + pkgencoding "github.com/apache/skywalking-banyandb/pkg/encoding" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" ) @@ -56,11 +58,13 @@ func TestEncodeDecodeTagValues_Int64_WithNilValues(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - encoded, err := EncodeTagValues(tt.values, pbv1.ValueTypeInt64) + bb := &bytes.Buffer{} + err := EncodeTagValues(bb, tt.values, pbv1.ValueTypeInt64) require.NoError(t, err) - require.NotNil(t, encoded) + require.NotNil(t, bb.Buf) - decoded, err := DecodeTagValues(encoded, pbv1.ValueTypeInt64, len(tt.values)) + decoder := &pkgencoding.BytesBlockDecoder{} + decoded, err := DecodeTagValues(nil, decoder, bb, pbv1.ValueTypeInt64, len(tt.values)) require.NoError(t, err) require.Len(t, decoded, len(tt.values)) @@ -104,11 +108,13 @@ func TestEncodeDecodeTagValues_Int64_WithNullStringValues(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - encoded, err := EncodeTagValues(tt.values, pbv1.ValueTypeInt64) + bb := &bytes.Buffer{} + err := EncodeTagValues(bb, tt.values, pbv1.ValueTypeInt64) require.NoError(t, err) - require.NotNil(t, encoded) + require.NotNil(t, bb.Buf) - decoded, err := DecodeTagValues(encoded, pbv1.ValueTypeInt64, len(tt.values)) + decoder := &pkgencoding.BytesBlockDecoder{} + decoded, err := DecodeTagValues(nil, decoder, bb, pbv1.ValueTypeInt64, len(tt.values)) require.NoError(t, err) require.Len(t, decoded, len(tt.values)) @@ -133,11 +139,13 @@ func TestEncodeDecodeTagValues_Int64_MixedNilAndNullString(t *testing.T) { []byte("null"), } - encoded, err := EncodeTagValues(values, pbv1.ValueTypeInt64) + bb := &bytes.Buffer{} + err := EncodeTagValues(bb, values, pbv1.ValueTypeInt64) require.NoError(t, err) - require.NotNil(t, encoded) + require.NotNil(t, bb.Buf) - decoded, err := DecodeTagValues(encoded, pbv1.ValueTypeInt64, len(values)) + decoder := &pkgencoding.BytesBlockDecoder{} + decoded, err := DecodeTagValues(nil, decoder, bb, pbv1.ValueTypeInt64, len(values)) require.NoError(t, err) require.Len(t, decoded, len(values)) @@ -164,11 +172,13 @@ func TestEncodeDecodeTagValues_Int64_ValidValues(t *testing.T) { convert.Int64ToBytes(-9223372036854775808), // min int64 } - encoded, err := EncodeTagValues(values, pbv1.ValueTypeInt64) + bb := &bytes.Buffer{} + err := EncodeTagValues(bb, values, pbv1.ValueTypeInt64) require.NoError(t, err) - require.NotNil(t, encoded) + require.NotNil(t, bb.Buf) - decoded, err := DecodeTagValues(encoded, pbv1.ValueTypeInt64, len(values)) + decoder := &pkgencoding.BytesBlockDecoder{} + decoded, err := DecodeTagValues(nil, decoder, bb, pbv1.ValueTypeInt64, len(values)) require.NoError(t, err) require.Len(t, decoded, len(values)) @@ -178,11 +188,13 @@ func TestEncodeDecodeTagValues_Int64_ValidValues(t *testing.T) { } func TestEncodeDecodeTagValues_Int64_EmptyInput(t *testing.T) { - encoded, err := EncodeTagValues(nil, pbv1.ValueTypeInt64) + bb := &bytes.Buffer{} + err := EncodeTagValues(bb, nil, pbv1.ValueTypeInt64) require.NoError(t, err) - assert.Nil(t, encoded) + assert.Nil(t, bb.Buf) - decoded, err := DecodeTagValues(nil, pbv1.ValueTypeInt64, 0) + decoder := &pkgencoding.BytesBlockDecoder{} + decoded, err := DecodeTagValues(nil, decoder, bb, pbv1.ValueTypeInt64, 0) require.NoError(t, err) assert.Nil(t, decoded) } diff --git a/banyand/internal/sidx/block.go b/banyand/internal/sidx/block.go index d740ef0b..77bc1c4c 100644 --- a/banyand/internal/sidx/block.go +++ b/banyand/internal/sidx/block.go @@ -23,6 +23,7 @@ import ( "fmt" "github.com/apache/skywalking-banyandb/api/common" + internalencoding "github.com/apache/skywalking-banyandb/banyand/internal/encoding" "github.com/apache/skywalking-banyandb/pkg/bytes" "github.com/apache/skywalking-banyandb/pkg/compress/zstd" "github.com/apache/skywalking-banyandb/pkg/encoding" @@ -278,15 +279,15 @@ func (b *block) mustWriteTag(tagName string, td *tagData, bm *blockMetadata, ww }() // Encode tag values using the encoding module - encodedData, err := encodeTagValues(td.values, td.valueType) + err := internalencoding.EncodeTagValues(bb, td.values, td.valueType) if err != nil { panic(fmt.Sprintf("failed to encode tag values: %v", err)) } // Write tag data without compression tm.dataBlock.offset = tdw.bytesWritten - tm.dataBlock.size = uint64(len(encodedData)) - tdw.MustWrite(encodedData) + tm.dataBlock.size = uint64(len(bb.Buf)) + tdw.MustWrite(bb.Buf) // Write bloom filter if indexed if td.indexed && td.filter != nil { diff --git a/banyand/internal/sidx/part.go b/banyand/internal/sidx/part.go index 6840b0e4..454fadcd 100644 --- a/banyand/internal/sidx/part.go +++ b/banyand/internal/sidx/part.go @@ -25,6 +25,7 @@ import ( "strings" "github.com/apache/skywalking-banyandb/api/common" + internalencoding "github.com/apache/skywalking-banyandb/banyand/internal/encoding" "github.com/apache/skywalking-banyandb/banyand/internal/storage" "github.com/apache/skywalking-banyandb/pkg/bytes" "github.com/apache/skywalking-banyandb/pkg/compress/zstd" @@ -354,7 +355,7 @@ func (p *part) readAll() ([]*elements, error) { // Read tags for each tag name for tagName := range bm.tagsBlocks { - err = p.readBlockTags(tagName, bm, elems) + err = p.readBlockTags(tagName, bm, elems, blockBytesDecoder) if err != nil { releaseElements(elems) for _, e := range result { @@ -372,7 +373,7 @@ func (p *part) readAll() ([]*elements, error) { } // readBlockTags reads and decodes tag data for a specific tag in a block. -func (p *part) readBlockTags(tagName string, bm *blockMetadata, elems *elements) error { +func (p *part) readBlockTags(tagName string, bm *blockMetadata, elems *elements, decoder *encoding.BytesBlockDecoder) error { tagBlockInfo, exists := bm.tagsBlocks[tagName] if !exists { return fmt.Errorf("tag block info not found for tag: %s", tagName) @@ -405,7 +406,7 @@ func (p *part) readBlockTags(tagName string, bm *blockMetadata, elems *elements) fs.MustReadData(tdReader, int64(tm.dataBlock.offset), tdData) // Decode tag values directly (no compression) - tagValues, err := decodeTagValues(tdData, tm.valueType, int(bm.count)) + tagValues, err := internalencoding.DecodeTagValues(nil, decoder, &bytes.Buffer{Buf: tdData}, tm.valueType, int(bm.count)) if err != nil { return fmt.Errorf("cannot decode tag values: %w", err) } diff --git a/banyand/internal/sidx/query_result.go b/banyand/internal/sidx/query_result.go index 87154640..fa3ddc15 100644 --- a/banyand/internal/sidx/query_result.go +++ b/banyand/internal/sidx/query_result.go @@ -25,6 +25,7 @@ import ( "go.uber.org/multierr" "github.com/apache/skywalking-banyandb/api/common" + internalencoding "github.com/apache/skywalking-banyandb/banyand/internal/encoding" "github.com/apache/skywalking-banyandb/banyand/protector" "github.com/apache/skywalking-banyandb/pkg/bytes" "github.com/apache/skywalking-banyandb/pkg/cgroups" @@ -247,7 +248,7 @@ func (qr *queryResult) loadBlockData(tmpBlock *block, p *part, bm *blockMetadata continue // Skip missing tags } - if !qr.loadTagData(tmpBlock, p, tagName, &tagBlockInfo, int(bm.count)) { + if !qr.loadTagData(tmpBlock, p, tagName, &tagBlockInfo, int(bm.count), decoder) { // Continue loading other tags even if one fails continue } @@ -257,7 +258,7 @@ func (qr *queryResult) loadBlockData(tmpBlock *block, p *part, bm *blockMetadata } // loadTagData loads data for a specific tag, following the pattern from readBlockTags. -func (qr *queryResult) loadTagData(tmpBlock *block, p *part, tagName string, tagBlockInfo *dataBlock, count int) bool { +func (qr *queryResult) loadTagData(tmpBlock *block, p *part, tagName string, tagBlockInfo *dataBlock, count int, decoder *encoding.BytesBlockDecoder) bool { // Get tag metadata reader tmReader, tmExists := p.getTagMetadataReader(tagName) if !tmExists { @@ -302,18 +303,17 @@ func (qr *queryResult) loadTagData(tmpBlock *block, p *part, tagName string, tag bb2.Buf = bytes.ResizeOver(bb2.Buf[:0], int(tm.dataBlock.size)) fs.MustReadData(tdReader, int64(tm.dataBlock.offset), bb2.Buf) + // Create tag data structure and populate block + td := generateTagData() // Decode tag values directly (no compression) - tagValues, err := decodeTagValues(bb2.Buf, tm.valueType, count) + td.values, err = internalencoding.DecodeTagValues(td.values[:0], decoder, bb2, tm.valueType, count) if err != nil { return false } - // Create tag data structure and populate block - td := generateTagData() td.name = tagName td.valueType = tm.valueType td.indexed = tm.indexed - td.values = tagValues // Set min/max for int64 tags if tm.valueType == pbv1.ValueTypeInt64 { @@ -324,7 +324,7 @@ func (qr *queryResult) loadTagData(tmpBlock *block, p *part, tagName string, tag // Create bloom filter for indexed tags if needed if tm.indexed { td.filter = generateBloomFilter(count) - for _, value := range tagValues { + for _, value := range td.values { if value != nil { td.filter.Add(value) } diff --git a/banyand/internal/sidx/tag.go b/banyand/internal/sidx/tag.go index abb6bc33..9a38f381 100644 --- a/banyand/internal/sidx/tag.go +++ b/banyand/internal/sidx/tag.go @@ -21,7 +21,6 @@ import ( "bytes" "fmt" - "github.com/apache/skywalking-banyandb/banyand/internal/encoding" pkgencoding "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/filter" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" @@ -186,14 +185,6 @@ func decodeBloomFilter(src []byte) (*filter.BloomFilter, error) { return bf, nil } -func encodeTagValues(values [][]byte, valueType pbv1.ValueType) ([]byte, error) { - return encoding.EncodeTagValues(values, valueType) -} - -func decodeTagValues(data []byte, valueType pbv1.ValueType, count int) ([][]byte, error) { - return encoding.DecodeTagValues(data, valueType, count) -} - // updateMinMax updates min/max values for int64 tags. func (td *tagData) updateMinMax() { if td.valueType != pbv1.ValueTypeInt64 || len(td.values) == 0 { diff --git a/banyand/liaison/grpc/discovery.go b/banyand/liaison/grpc/discovery.go index 859bd7b5..8df965ad 100644 --- a/banyand/liaison/grpc/discovery.go +++ b/banyand/liaison/grpc/discovery.go @@ -47,7 +47,12 @@ type discoveryService struct { } func newDiscoveryService(kind schema.Kind, metadataRepo metadata.Repo, nodeRegistry NodeRegistry, gr *groupRepo) *discoveryService { - er := &entityRepo{entitiesMap: make(map[identity]partition.Locator)} + er := &entityRepo{ + entitiesMap: make(map[identity]partition.Locator), + measureMap: make(map[identity]*databasev1.Measure), + traceMap: make(map[identity]*databasev1.Trace), + traceIDIndexMap: make(map[identity]int), + } return newDiscoveryServiceWithEntityRepo(kind, metadataRepo, nodeRegistry, gr, er) } @@ -184,9 +189,11 @@ var _ schema.EventHandler = (*entityRepo)(nil) type entityRepo struct { schema.UnimplementedOnInitHandler - log *logger.Logger - entitiesMap map[identity]partition.Locator - measureMap map[identity]*databasev1.Measure + log *logger.Logger + entitiesMap map[identity]partition.Locator + measureMap map[identity]*databasev1.Measure + traceMap map[identity]*databasev1.Trace + traceIDIndexMap map[identity]int // Cache trace ID tag index sync.RWMutex } @@ -195,20 +202,6 @@ func (e *entityRepo) OnAddOrUpdate(schemaMetadata schema.Metadata) { var l partition.Locator var id identity var modRevision int64 - switch schemaMetadata.Kind { - case schema.KindMeasure: - measure := schemaMetadata.Spec.(*databasev1.Measure) - modRevision = measure.GetMetadata().GetModRevision() - l = partition.NewEntityLocator(measure.TagFamilies, measure.Entity, modRevision) - id = getID(measure.GetMetadata()) - case schema.KindStream: - stream := schemaMetadata.Spec.(*databasev1.Stream) - modRevision = stream.GetMetadata().GetModRevision() - l = partition.NewEntityLocator(stream.TagFamilies, stream.Entity, modRevision) - id = getID(stream.GetMetadata()) - default: - return - } if le := e.log.Debug(); le.Enabled() { var kind string switch schemaMetadata.Kind { @@ -216,6 +209,8 @@ func (e *entityRepo) OnAddOrUpdate(schemaMetadata schema.Metadata) { kind = "measure" case schema.KindStream: kind = "stream" + case schema.KindTrace: + kind = "trace" default: kind = "unknown" } @@ -225,6 +220,36 @@ func (e *entityRepo) OnAddOrUpdate(schemaMetadata schema.Metadata) { Str("kind", kind). Msg("entity added or updated") } + switch schemaMetadata.Kind { + case schema.KindMeasure: + measure := schemaMetadata.Spec.(*databasev1.Measure) + modRevision = measure.GetMetadata().GetModRevision() + l = partition.NewEntityLocator(measure.TagFamilies, measure.Entity, modRevision) + id = getID(measure.GetMetadata()) + case schema.KindStream: + stream := schemaMetadata.Spec.(*databasev1.Stream) + modRevision = stream.GetMetadata().GetModRevision() + l = partition.NewEntityLocator(stream.TagFamilies, stream.Entity, modRevision) + id = getID(stream.GetMetadata()) + case schema.KindTrace: + trace := schemaMetadata.Spec.(*databasev1.Trace) + id = getID(trace.GetMetadata()) + e.traceMap[id] = trace + // Pre-compute trace ID tag index + traceIDTagName := trace.GetTraceIdTagName() + traceIDIndex := -1 + for i, tagSpec := range trace.GetTags() { + if tagSpec.GetName() == traceIDTagName { + traceIDIndex = i + break + } + } + e.traceIDIndexMap[id] = traceIDIndex + return + default: + return + } + e.RWMutex.Lock() defer e.RWMutex.Unlock() e.entitiesMap[id] = partition.Locator{TagLocators: l.TagLocators, ModRevision: modRevision} @@ -246,6 +271,9 @@ func (e *entityRepo) OnDelete(schemaMetadata schema.Metadata) { case schema.KindStream: stream := schemaMetadata.Spec.(*databasev1.Stream) id = getID(stream.GetMetadata()) + case schema.KindTrace: + trace := schemaMetadata.Spec.(*databasev1.Trace) + id = getID(trace.GetMetadata()) default: return } @@ -256,6 +284,8 @@ func (e *entityRepo) OnDelete(schemaMetadata schema.Metadata) { kind = "measure" case schema.KindStream: kind = "stream" + case schema.KindTrace: + kind = "trace" default: kind = "unknown" } @@ -268,7 +298,9 @@ func (e *entityRepo) OnDelete(schemaMetadata schema.Metadata) { e.RWMutex.Lock() defer e.RWMutex.Unlock() delete(e.entitiesMap, id) - delete(e.measureMap, id) // Ensure measure is not stored for streams + delete(e.measureMap, id) // Clean up measure + delete(e.traceMap, id) // Clean up trace + delete(e.traceIDIndexMap, id) // Clean up trace ID index } func (e *entityRepo) getLocator(id identity) (partition.Locator, bool) { @@ -281,6 +313,26 @@ func (e *entityRepo) getLocator(id identity) (partition.Locator, bool) { return el, true } +func (e *entityRepo) getTrace(id identity) (*databasev1.Trace, bool) { + e.RWMutex.RLock() + defer e.RWMutex.RUnlock() + trace, ok := e.traceMap[id] + if !ok { + return nil, false + } + return trace, true +} + +func (e *entityRepo) getTraceIDIndex(id identity) (int, bool) { + e.RWMutex.RLock() + defer e.RWMutex.RUnlock() + index, ok := e.traceIDIndexMap[id] + if !ok { + return -1, false + } + return index, true +} + var _ schema.EventHandler = (*shardingKeyRepo)(nil) type shardingKeyRepo struct { diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go index ecc84fd2..f8d0c704 100644 --- a/banyand/liaison/grpc/server.go +++ b/banyand/liaison/grpc/server.go @@ -40,6 +40,7 @@ import ( measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" + tracev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1" "github.com/apache/skywalking-banyandb/banyand/liaison/pkg/auth" "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" @@ -97,6 +98,7 @@ type server struct { *topNAggregationRegistryServer *groupRegistryServer *traceRegistryServer + traceSVC *traceService authReloader *auth.Reloader metrics *metrics keyFile string @@ -130,11 +132,17 @@ func NewServer(_ context.Context, tir1Client, tir2Client, broadcaster queue.Clie pipeline: tir1Client, broadcaster: broadcaster, } + traceSVC := &traceService{ + discoveryService: newDiscoveryService(schema.KindTrace, schemaRegistry, nr.StreamLiaisonNodeRegistry, gr), + pipeline: tir1Client, + broadcaster: broadcaster, + } s := &server{ omr: omr, streamSVC: streamSVC, measureSVC: measureSVC, + traceSVC: traceSVC, groupRepo: gr, streamRegistryServer: &streamRegistryServer{ schemaRegistry: schemaRegistry, @@ -169,7 +177,7 @@ func NewServer(_ context.Context, tir1Client, tir2Client, broadcaster queue.Clie schemaRepo: schemaRegistry, authReloader: auth.InitAuthReloader(), } - s.accessLogRecorders = []accessLogRecorder{streamSVC, measureSVC} + s.accessLogRecorders = []accessLogRecorder{streamSVC, measureSVC, traceSVC} return s } @@ -178,10 +186,12 @@ func (s *server) PreRun(_ context.Context) error { s.log = logger.GetLogger("liaison-grpc") s.streamSVC.setLogger(s.log.Named("stream-t1")) s.measureSVC.setLogger(s.log) + s.traceSVC.setLogger(s.log.Named("trace")) s.propertyServer.SetLogger(s.log) components := []*discoveryService{ s.streamSVC.discoveryService, s.measureSVC.discoveryService, + s.traceSVC.discoveryService, s.propertyServer.discoveryService, } s.schemaRepo.RegisterHandler("liaison", schema.KindGroup, s.groupRepo) @@ -208,6 +218,7 @@ func (s *server) PreRun(_ context.Context) error { s.metrics = metrics s.streamSVC.metrics = metrics s.measureSVC.metrics = metrics + s.traceSVC.metrics = metrics s.propertyServer.metrics = metrics s.streamRegistryServer.metrics = metrics s.indexRuleBindingRegistryServer.metrics = metrics @@ -339,6 +350,7 @@ func (s *server) Serve() run.StopNotify { commonv1.RegisterServiceServer(s.ser, &apiVersionService{}) streamv1.RegisterStreamServiceServer(s.ser, s.streamSVC) measurev1.RegisterMeasureServiceServer(s.ser, s.measureSVC) + tracev1.RegisterTraceServiceServer(s.ser, s.traceSVC) databasev1.RegisterGroupRegistryServiceServer(s.ser, s.groupRegistryServer) databasev1.RegisterIndexRuleBindingRegistryServiceServer(s.ser, s.indexRuleBindingRegistryServer) databasev1.RegisterIndexRuleRegistryServiceServer(s.ser, s.indexRuleRegistryServer) diff --git a/banyand/liaison/grpc/trace.go b/banyand/liaison/grpc/trace.go new file mode 100644 index 00000000..599d054c --- /dev/null +++ b/banyand/liaison/grpc/trace.go @@ -0,0 +1,370 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package grpc + +import ( + "context" + "hash/fnv" + "io" + "time" + + "github.com/pkg/errors" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/api/data" + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + tracev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1" + "github.com/apache/skywalking-banyandb/banyand/queue" + "github.com/apache/skywalking-banyandb/pkg/accesslog" + "github.com/apache/skywalking-banyandb/pkg/bus" + "github.com/apache/skywalking-banyandb/pkg/convert" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/query" + "github.com/apache/skywalking-banyandb/pkg/timestamp" +) + +type traceService struct { + tracev1.UnimplementedTraceServiceServer + ingestionAccessLog accesslog.Log + pipeline queue.Client + broadcaster queue.Client + *discoveryService + l *logger.Logger + metrics *metrics + writeTimeout time.Duration + maxWaitDuration time.Duration +} + +func (s *traceService) setLogger(log *logger.Logger) { + s.l = log +} + +func (s *traceService) activeIngestionAccessLog(root string, sampled bool) (err error) { + if s.ingestionAccessLog, err = accesslog. + NewFileLog(root, "trace-ingest-%s", 10*time.Minute, s.log, sampled); err != nil { + return err + } + return nil +} + +func (s *traceService) validateTimestamp(writeEntity *tracev1.WriteRequest) error { + // Get trace schema from entityRepo + id := getID(writeEntity.GetMetadata()) + traceEntity, existed := s.entityRepo.getTrace(id) + if !existed { + return errors.New("trace schema not found") + } + + timestampTagName := traceEntity.GetTimestampTagName() + for _, tag := range writeEntity.GetTags() { + if tag.GetTimestamp() != nil { + if err := timestamp.CheckPb(tag.GetTimestamp()); err != nil { + s.l.Error().Stringer("written", writeEntity).Err(err).Msg("the timestamp is invalid") + return err + } + return nil + } + } + + return errors.New("timestamp tag not found: " + timestampTagName) +} + +func (s *traceService) validateMetadata(writeEntity *tracev1.WriteRequest) error { + if writeEntity.Metadata.ModRevision > 0 { + traceCache, existed := s.entityRepo.getTrace(getID(writeEntity.GetMetadata())) + if !existed { + return errors.New("trace schema not found") + } + if writeEntity.Metadata.ModRevision != traceCache.GetMetadata().GetModRevision() { + return errors.New("expired trace schema") + } + } + return nil +} + +func (s *traceService) extractTraceID(tags []*modelv1.TagValue, traceIDIndex int) (string, error) { + if len(tags) == 0 { + return "", errors.New("no tags found") + } + + if traceIDIndex < 0 || traceIDIndex >= len(tags) { + return "", errors.New("trace ID tag index out of range") + } + + tag := tags[traceIDIndex] + switch v := tag.GetValue().(type) { + case *modelv1.TagValue_Str: + return v.Str.GetValue(), nil + case *modelv1.TagValue_BinaryData: + return string(v.BinaryData), nil + default: + return "", errors.New("trace ID must be string or binary data") + } +} + +func (s *traceService) getTraceShardID(writeEntity *tracev1.WriteRequest) (common.ShardID, error) { + // Get shard count from group configuration + shardCount, existed := s.groupRepo.shardNum(writeEntity.GetMetadata().GetGroup()) + if !existed { + return 0, errors.New("group not found or no shard configuration") + } + + // Get cached trace ID index from entityRepo + id := getID(writeEntity.GetMetadata()) + traceIDIndex, existed := s.entityRepo.getTraceIDIndex(id) + if !existed { + return 0, errors.New("trace schema not found") + } + + if traceIDIndex == -1 { + return 0, errors.New("trace ID tag not found in schema") + } + + traceID, err := s.extractTraceID(writeEntity.GetTags(), traceIDIndex) + if err != nil { + return 0, err + } + + // Calculate shard ID using hash of trace ID + hasher := fnv.New32a() + hasher.Write([]byte(traceID)) + hash := hasher.Sum32() + + return common.ShardID(hash % shardCount), nil +} + +func (s *traceService) getTraceShardIDWithRetry(writeEntity *tracev1.WriteRequest) (common.ShardID, error) { + if s.maxWaitDuration > 0 { + retryInterval := 10 * time.Millisecond + startTime := time.Now() + for { + shardID, err := s.getTraceShardID(writeEntity) + if err == nil || !errors.Is(err, errNotExist) || time.Since(startTime) > s.maxWaitDuration { + return shardID, err + } + time.Sleep(retryInterval) + retryInterval = time.Duration(float64(retryInterval) * 1.5) + if retryInterval > time.Second { + retryInterval = time.Second + } + } + } + return s.getTraceShardID(writeEntity) +} + +func (s *traceService) publishMessages( + ctx context.Context, + publisher queue.BatchPublisher, + writeEntity *tracev1.WriteRequest, + shardID common.ShardID, +) ([]string, error) { + iwr := &tracev1.InternalWriteRequest{ + ShardId: uint32(shardID), + Request: writeEntity, + } + nodeID, err := s.nodeRegistry.Locate(writeEntity.GetMetadata().GetGroup(), writeEntity.GetMetadata().GetName(), uint32(shardID), 0) + if err != nil { + return nil, err + } + + message := bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, iwr) + if _, err := publisher.Publish(ctx, data.TopicTraceWrite, message); err != nil { + return nil, err + } + return []string{nodeID}, nil +} + +func (s *traceService) Write(stream tracev1.TraceService_WriteServer) error { + reply := func(metadata *commonv1.Metadata, status modelv1.Status, version uint64, stream tracev1.TraceService_WriteServer, logger *logger.Logger) { + if status != modelv1.Status_STATUS_SUCCEED { + s.metrics.totalStreamMsgReceivedErr.Inc(1, metadata.Group, "trace", "write") + } + s.metrics.totalStreamMsgSent.Inc(1, metadata.Group, "trace", "write") + if errResp := stream.Send(&tracev1.WriteResponse{Metadata: metadata, Status: status.String(), Version: version}); errResp != nil { + if dl := logger.Debug(); dl.Enabled() { + dl.Err(errResp).Msg("failed to send trace write response") + } + s.metrics.totalStreamMsgSentErr.Inc(1, metadata.Group, "trace", "write") + } + } + + s.metrics.totalStreamStarted.Inc(1, "trace", "write") + publisher := s.pipeline.NewBatchPublisher(s.writeTimeout) + start := time.Now() + var succeedSent []succeedSentMessage + requestCount := 0 + defer func() { + cee, err := publisher.Close() + for _, ssm := range succeedSent { + code := modelv1.Status_STATUS_SUCCEED + if cee != nil { + for _, node := range ssm.nodes { + if ce, ok := cee[node]; ok { + code = ce.Status() + break + } + } + } + reply(ssm.metadata, code, ssm.messageID, stream, s.l) + } + if err != nil { + s.l.Error().Err(err).Msg("failed to close the publisher") + } + if dl := s.l.Debug(); dl.Enabled() { + dl.Int("total_requests", requestCount).Msg("completed trace write batch") + } + s.metrics.totalStreamFinished.Inc(1, "trace", "write") + s.metrics.totalStreamLatency.Inc(time.Since(start).Seconds(), "trace", "write") + }() + + ctx := stream.Context() + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + writeEntity, err := stream.Recv() + if errors.Is(err, io.EOF) { + return nil + } + if err != nil { + if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) { + s.l.Error().Stringer("written", writeEntity).Err(err).Msg("failed to receive message") + } + return err + } + + requestCount++ + s.metrics.totalStreamMsgReceived.Inc(1, writeEntity.Metadata.Group, "trace", "write") + + if err = s.validateTimestamp(writeEntity); err != nil { + reply(writeEntity.GetMetadata(), modelv1.Status_STATUS_INVALID_TIMESTAMP, writeEntity.GetVersion(), stream, s.l) + continue + } + + if err = s.validateMetadata(writeEntity); err != nil { + status := modelv1.Status_STATUS_INTERNAL_ERROR + if errors.Is(err, errors.New("trace schema not found")) { + status = modelv1.Status_STATUS_NOT_FOUND + } else if errors.Is(err, errors.New("expired trace schema")) { + status = modelv1.Status_STATUS_EXPIRED_SCHEMA + } + s.l.Error().Err(err).Stringer("written", writeEntity).Msg("metadata validation failed") + reply(writeEntity.GetMetadata(), status, writeEntity.GetVersion(), stream, s.l) + continue + } + + shardID, err := s.getTraceShardIDWithRetry(writeEntity) + if err != nil { + s.l.Error().Err(err).RawJSON("written", logger.Proto(writeEntity)).Msg("trace sharding failed") + reply(writeEntity.GetMetadata(), modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetVersion(), stream, s.l) + continue + } + + if s.ingestionAccessLog != nil { + if errAL := s.ingestionAccessLog.Write(writeEntity); errAL != nil { + s.l.Error().Err(errAL).Msg("failed to write ingestion access log") + } + } + + nodes, err := s.publishMessages(ctx, publisher, writeEntity, shardID) + if err != nil { + s.l.Error().Err(err).RawJSON("written", logger.Proto(writeEntity)).Msg("publishing failed") + reply(writeEntity.GetMetadata(), modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetVersion(), stream, s.l) + continue + } + + succeedSent = append(succeedSent, succeedSentMessage{ + metadata: writeEntity.GetMetadata(), + messageID: writeEntity.GetVersion(), + nodes: nodes, + }) + } +} + +var emptyTraceQueryResponse = &tracev1.QueryResponse{Spans: make([]*tracev1.Span, 0)} + +func (s *traceService) Query(ctx context.Context, req *tracev1.QueryRequest) (resp *tracev1.QueryResponse, err error) { + for _, g := range req.Groups { + s.metrics.totalStarted.Inc(1, g, "trace", "query") + } + start := time.Now() + defer func() { + for _, g := range req.Groups { + s.metrics.totalFinished.Inc(1, g, "trace", "query") + if err != nil { + s.metrics.totalErr.Inc(1, g, "trace", "query") + } + s.metrics.totalLatency.Inc(time.Since(start).Seconds(), g, "trace", "query") + } + }() + timeRange := req.GetTimeRange() + if timeRange == nil { + req.TimeRange = timestamp.DefaultTimeRange + } + if err = timestamp.CheckTimeRange(req.GetTimeRange()); err != nil { + return nil, status.Errorf(codes.InvalidArgument, "%v is invalid :%s", req.GetTimeRange(), err) + } + now := time.Now() + if req.Trace { + tracer, _ := query.NewTracer(ctx, now.Format(time.RFC3339Nano)) + span, _ := tracer.StartSpan(ctx, "trace-grpc") + span.Tag("request", convert.BytesToString(logger.Proto(req))) + defer func() { + if err != nil { + span.Error(err) + } else { + span.AddSubTrace(resp.TraceQueryResult) + resp.TraceQueryResult = tracer.ToProto() + } + span.Stop() + }() + } + message := bus.NewMessage(bus.MessageID(now.UnixNano()), req) + feat, errQuery := s.broadcaster.Publish(ctx, data.TopicTraceQuery, message) + if errQuery != nil { + if errors.Is(errQuery, io.EOF) { + return emptyTraceQueryResponse, nil + } + return nil, errQuery + } + msg, errFeat := feat.Get() + if errFeat != nil { + return nil, errFeat + } + data := msg.Data() + switch d := data.(type) { + case *tracev1.QueryResponse: + return d, nil + case *common.Error: + return nil, errors.WithMessage(errQueryMsg, d.Error()) + } + return nil, nil +} + +func (s *traceService) Close() error { + if s.ingestionAccessLog != nil { + return s.ingestionAccessLog.Close() + } + return nil +} diff --git a/banyand/liaison/http/server.go b/banyand/liaison/http/server.go index 20bac794..d5259514 100644 --- a/banyand/liaison/http/server.go +++ b/banyand/liaison/http/server.go @@ -43,6 +43,7 @@ import ( measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" + tracev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1" "github.com/apache/skywalking-banyandb/banyand/liaison/pkg/auth" "github.com/apache/skywalking-banyandb/pkg/healthcheck" "github.com/apache/skywalking-banyandb/pkg/logger" @@ -359,6 +360,7 @@ func (p *server) initGRPCClient() error { measurev1.RegisterMeasureServiceHandlerFromEndpoint(p.grpcCtx, p.gwMux, p.grpcAddr, opts), propertyv1.RegisterPropertyServiceHandlerFromEndpoint(p.grpcCtx, p.gwMux, p.grpcAddr, opts), databasev1.RegisterTraceRegistryServiceHandlerFromEndpoint(p.grpcCtx, p.gwMux, p.grpcAddr, opts), + tracev1.RegisterTraceServiceHandlerFromEndpoint(p.grpcCtx, p.gwMux, p.grpcAddr, opts), ) if err != nil { return errors.Wrap(err, "failed to register endpoints") diff --git a/banyand/stream/tag.go b/banyand/stream/tag.go index f1751313..d78bcb89 100644 --- a/banyand/stream/tag.go +++ b/banyand/stream/tag.go @@ -18,7 +18,7 @@ package stream import ( - "github.com/apache/skywalking-banyandb/banyand/internal/encoding" + internalencoding "github.com/apache/skywalking-banyandb/banyand/internal/encoding" "github.com/apache/skywalking-banyandb/pkg/bytes" pkgencoding "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/fs" @@ -61,18 +61,21 @@ func (t *tag) mustWriteTo(tm *tagMetadata, tagWriter *writer, tagFilterWriter *w tm.name = t.name tm.valueType = t.valueType + bb := bigValuePool.Generate() + defer bigValuePool.Release(bb) + // Use shared encoding module - encodedData, err := encoding.EncodeTagValues(t.values, t.valueType) + err := internalencoding.EncodeTagValues(bb, t.values, t.valueType) if err != nil { logger.Panicf("failed to encode tag values: %v", err) } - tm.size = uint64(len(encodedData)) + tm.size = uint64(len(bb.Buf)) if tm.size > maxValuesBlockSize { logger.Panicf("too large valuesSize: %d bytes; mustn't exceed %d bytes", tm.size, maxValuesBlockSize) } tm.offset = tagWriter.bytesWritten - tagWriter.MustWrite(encodedData) + tagWriter.MustWrite(bb.Buf) if t.filter != nil { bb := bigValuePool.Generate() @@ -88,7 +91,7 @@ func (t *tag) mustWriteTo(tm *tagMetadata, tagWriter *writer, tagFilterWriter *w } } -func (t *tag) mustReadValues(_ *pkgencoding.BytesBlockDecoder, reader fs.Reader, cm tagMetadata, count uint64) { +func (t *tag) mustReadValues(decoder *pkgencoding.BytesBlockDecoder, reader fs.Reader, cm tagMetadata, count uint64) { t.name = cm.name t.valueType = cm.valueType if t.valueType == pbv1.ValueTypeUnknown { @@ -103,18 +106,20 @@ func (t *tag) mustReadValues(_ *pkgencoding.BytesBlockDecoder, reader fs.Reader, logger.Panicf("%s: block size cannot exceed %d bytes; got %d bytes", reader.Path(), maxValuesBlockSize, valuesSize) } - data := make([]byte, valuesSize) - fs.MustReadData(reader, int64(cm.offset), data) - // Use shared decoding module - decodedValues, err := encoding.DecodeTagValues(data, t.valueType, int(count)) + bb := bigValuePool.Generate() + defer bigValuePool.Release(bb) + bb.Buf = bytes.ResizeOver(bb.Buf[:0], int(valuesSize)) + fs.MustReadData(reader, int64(cm.offset), bb.Buf) + + var err error + t.values, err = internalencoding.DecodeTagValues(t.values, decoder, bb, t.valueType, int(count)) if err != nil { logger.Panicf("%s: failed to decode tag values: %v", reader.Path(), err) } - t.values = decodedValues } -func (t *tag) mustSeqReadValues(_ *pkgencoding.BytesBlockDecoder, reader *seqReader, cm tagMetadata, count uint64) { +func (t *tag) mustSeqReadValues(decoder *pkgencoding.BytesBlockDecoder, reader *seqReader, cm tagMetadata, count uint64) { t.name = cm.name t.valueType = cm.valueType if cm.offset != reader.bytesRead { @@ -125,15 +130,17 @@ func (t *tag) mustSeqReadValues(_ *pkgencoding.BytesBlockDecoder, reader *seqRea logger.Panicf("%s: block size cannot exceed %d bytes; got %d bytes", reader.Path(), maxValuesBlockSize, valuesSize) } - data := make([]byte, valuesSize) - reader.mustReadFull(data) + bb := bigValuePool.Generate() + defer bigValuePool.Release(bb) + bb.Buf = bytes.ResizeOver(bb.Buf[:0], int(valuesSize)) + reader.mustReadFull(bb.Buf) // Use shared decoding module - decodedValues, err := encoding.DecodeTagValues(data, t.valueType, int(count)) + var err error + t.values, err = internalencoding.DecodeTagValues(t.values, decoder, bb, t.valueType, int(count)) if err != nil { logger.Panicf("%s: failed to decode tag values: %v", reader.Path(), err) } - t.values = decodedValues } var bigValuePool = bytes.NewBufferPool("stream-big-value") diff --git a/banyand/trace/block.go b/banyand/trace/block.go index 1fc28c26..48dd3c30 100644 --- a/banyand/trace/block.go +++ b/banyand/trace/block.go @@ -159,10 +159,12 @@ func (b *block) unmarshalTag(decoder *encoding.BytesBlockDecoder, i int, if err != nil { logger.Panicf("%s: cannot unmarshal tagMetadata: %v", metaReader.Path(), err) } + tm.name = name bigValuePool.Release(bb) b.tags[i].name = name if valueType, ok := tagType[name]; ok { b.tags[i].valueType = valueType + tm.valueType = valueType } else { b.tags[i].valueType = pbv1.ValueTypeUnknown for j := range b.tags[i].values { @@ -199,6 +201,8 @@ func (b *block) unmarshalTagFromSeqReaders(decoder *encoding.BytesBlockDecoder, sort.Strings(keys) b.tags[i].name = keys[i] b.tags[i].valueType = tagType[keys[i]] + tm.name = keys[i] + tm.valueType = tagType[keys[i]] b.tags[i].mustSeqReadValues(decoder, valueReader, *tm, uint64(b.Len())) } diff --git a/banyand/trace/introducer.go b/banyand/trace/introducer.go index a67869de..809820c9 100644 --- a/banyand/trace/introducer.go +++ b/banyand/trace/introducer.go @@ -32,7 +32,7 @@ func (i *introduction) reset() { i.applied = nil } -var introductionPool = pool.Register[*introduction]("stream-introduction") +var introductionPool = pool.Register[*introduction]("trace-introduction") func generateIntroduction() *introduction { v := introductionPool.Get() @@ -60,7 +60,7 @@ func (i *flusherIntroduction) reset() { i.applied = nil } -var flusherIntroductionPool = pool.Register[*flusherIntroduction]("stream-flusher-introduction") +var flusherIntroductionPool = pool.Register[*flusherIntroduction]("trace-flusher-introduction") func generateFlusherIntroduction() *flusherIntroduction { v := flusherIntroductionPool.Get() @@ -94,7 +94,7 @@ func (i *mergerIntroduction) reset() { i.creator = 0 } -var mergerIntroductionPool = pool.Register[*mergerIntroduction]("stream-merger-introduction") +var mergerIntroductionPool = pool.Register[*mergerIntroduction]("trace-merger-introduction") func generateMergerIntroduction() *mergerIntroduction { v := mergerIntroductionPool.Get() @@ -122,7 +122,7 @@ func (i *syncIntroduction) reset() { i.applied = nil } -var syncIntroductionPool = pool.Register[*syncIntroduction]("stream-sync-introduction") +var syncIntroductionPool = pool.Register[*syncIntroduction]("trace-sync-introduction") func generateSyncIntroduction() *syncIntroduction { v := syncIntroductionPool.Get() diff --git a/banyand/trace/metadata.go b/banyand/trace/metadata.go index 0d3c9cf0..b44836bf 100644 --- a/banyand/trace/metadata.go +++ b/banyand/trace/metadata.go @@ -20,10 +20,12 @@ package trace import ( "context" "fmt" + "path" "time" "github.com/pkg/errors" + "github.com/apache/skywalking-banyandb/api/common" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/api/validate" @@ -34,7 +36,9 @@ import ( "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/banyand/protector" + "github.com/apache/skywalking-banyandb/banyand/queue/pub" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/meter" resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) @@ -91,7 +95,10 @@ func newLiaisonSchemaRepo(path string, svc *liaison, traceDataNodeRegistry grpc. } func (sr *schemaRepo) start() { - sr.l.Info().Str("path", sr.path).Msg("starting trace metadata repository") + sr.Watcher() + sr.metadata. + RegisterHandler("trace", schema.KindGroup|schema.KindTrace|schema.KindIndexRuleBinding|schema.KindIndexRule, + sr) } func (sr *schemaRepo) Trace(metadata *commonv1.Metadata) (*trace, bool) { @@ -103,8 +110,16 @@ func (sr *schemaRepo) Trace(metadata *commonv1.Metadata) (*trace, bool) { return t, ok } -func (sr *schemaRepo) GetRemovalSegmentsTimeRange(_ string) *timestamp.TimeRange { - panic("not implemented") +func (sr *schemaRepo) GetRemovalSegmentsTimeRange(group string) *timestamp.TimeRange { + g, ok := sr.LoadGroup(group) + if !ok { + return nil + } + db := g.SupplyTSDB() + if db == nil { + return nil + } + return db.(storage.TSDB[*tsTable, option]).GetExpiredSegmentsTimeRange() } func (sr *schemaRepo) OnInit(kinds []schema.Kind) (bool, []int64) { @@ -260,6 +275,15 @@ type supplier struct { } func newSupplier(path string, svc *standalone, nodeLabels map[string]string) *supplier { + if svc.pm == nil { + svc.l.Panic().Msg("CRITICAL: svc.pm is nil in newSupplier") + } + opt := svc.option + opt.protector = svc.pm + + if opt.protector == nil { + svc.l.Panic().Msg("CRITICAL: opt.protector is still nil after assignment") + } return &supplier{ metadata: svc.metadata, omr: svc.omr, @@ -267,7 +291,7 @@ func newSupplier(path string, svc *standalone, nodeLabels map[string]string) *su l: svc.l, nodeLabels: nodeLabels, path: path, - option: svc.option, + option: opt, } } @@ -282,8 +306,64 @@ func (s *supplier) ResourceSchema(md *commonv1.Metadata) (resourceSchema.Resourc return s.metadata.TraceRegistry().GetTrace(ctx, md) } -func (s *supplier) OpenDB(_ *commonv1.Group) (resourceSchema.DB, error) { - panic("not implemented") +func (s *supplier) OpenDB(groupSchema *commonv1.Group) (resourceSchema.DB, error) { + name := groupSchema.Metadata.Group + p := common.Position{ + Module: "trace", + Database: name, + } + ro := groupSchema.ResourceOpts + if ro == nil { + return nil, fmt.Errorf("no resource opts in group %s", name) + } + shardNum := ro.ShardNum + ttl := ro.Ttl + segInterval := ro.SegmentInterval + segmentIdleTimeout := time.Duration(0) + if len(ro.Stages) > 0 && len(s.nodeLabels) > 0 { + var ttlNum uint32 + for _, st := range ro.Stages { + if st.Ttl.Unit != ro.Ttl.Unit { + return nil, fmt.Errorf("ttl unit %s is not consistent with stage %s", ro.Ttl.Unit, st.Ttl.Unit) + } + selector, err := pub.ParseLabelSelector(st.NodeSelector) + if err != nil { + return nil, errors.WithMessagef(err, "failed to parse node selector %s", st.NodeSelector) + } + ttlNum += st.Ttl.Num + if !selector.Matches(s.nodeLabels) { + continue + } + ttl.Num += ttlNum + shardNum = st.ShardNum + segInterval = st.SegmentInterval + if st.Close { + segmentIdleTimeout = 5 * time.Minute + } + break + } + } + group := groupSchema.Metadata.Group + opts := storage.TSDBOpts[*tsTable, option]{ + ShardNum: shardNum, + Location: path.Join(s.path, group), + TSTableCreator: newTSTable, + TableMetrics: s.newMetrics(p), + SegmentInterval: storage.MustToIntervalRule(segInterval), + TTL: storage.MustToIntervalRule(ttl), + Option: s.option, + SeriesIndexFlushTimeoutSeconds: s.option.flushTimeout.Nanoseconds() / int64(time.Second), + SeriesIndexCacheMaxBytes: int(s.option.seriesCacheMaxSize), + StorageMetricsFactory: s.omr.With(traceScope.ConstLabels(meter.ToLabelPairs(common.DBLabelNames(), p.DBLabelValues()))), + SegmentIdleTimeout: segmentIdleTimeout, + MemoryLimit: s.pm.GetLimit(), + } + return storage.OpenTSDB( + common.SetPosition(context.Background(), func(_ common.Position) common.Position { + return p + }), + opts, nil, group, + ) } // queueSupplier is the supplier for liaison service. @@ -299,6 +379,15 @@ type queueSupplier struct { } func newQueueSupplier(path string, svc *liaison, traceDataNodeRegistry grpc.NodeRegistry) *queueSupplier { + if svc.pm == nil { + svc.l.Panic().Msg("CRITICAL: svc.pm is nil in newSupplier") + } + opt := svc.option + opt.protector = svc.pm + + if opt.protector == nil { + svc.l.Panic().Msg("CRITICAL: opt.protector is still nil after assignment") + } return &queueSupplier{ metadata: svc.metadata, omr: svc.omr, @@ -306,7 +395,7 @@ func newQueueSupplier(path string, svc *liaison, traceDataNodeRegistry grpc.Node traceDataNodeRegistry: traceDataNodeRegistry, l: svc.l, path: path, - option: svc.option, + option: opt, } } @@ -321,6 +410,48 @@ func (qs *queueSupplier) ResourceSchema(md *commonv1.Metadata) (resourceSchema.R return qs.metadata.TraceRegistry().GetTrace(ctx, md) } -func (qs *queueSupplier) OpenDB(_ *commonv1.Group) (resourceSchema.DB, error) { - panic("not implemented") +func (qs *queueSupplier) OpenDB(groupSchema *commonv1.Group) (resourceSchema.DB, error) { + name := groupSchema.Metadata.Group + p := common.Position{ + Module: "trace", + Database: name, + } + ro := groupSchema.ResourceOpts + if ro == nil { + return nil, fmt.Errorf("no resource opts in group %s", name) + } + shardNum := ro.ShardNum + group := groupSchema.Metadata.Group + opts := wqueue.Opts[*tsTable, option]{ + Group: group, + ShardNum: shardNum, + SegmentInterval: storage.MustToIntervalRule(ro.SegmentInterval), + Location: path.Join(qs.path, group), + Option: qs.option, + Metrics: qs.newMetrics(p), + SubQueueCreator: newWriteQueue, + GetNodes: func(shardID common.ShardID) []string { + copies := ro.Replicas + 1 + nodeSet := make(map[string]struct{}, copies) + for i := uint32(0); i < copies; i++ { + nodeID, err := qs.traceDataNodeRegistry.Locate(group, "", uint32(shardID), i) + if err != nil { + qs.l.Error().Err(err).Str("group", group).Uint32("shard", uint32(shardID)).Uint32("copy", i).Msg("failed to locate node") + return nil + } + nodeSet[nodeID] = struct{}{} + } + nodes := make([]string, 0, len(nodeSet)) + for nodeID := range nodeSet { + nodes = append(nodes, nodeID) + } + return nodes + }, + } + return wqueue.Open( + common.SetPosition(context.Background(), func(_ common.Position) common.Position { + return p + }), + opts, group, + ) } diff --git a/banyand/trace/part.go b/banyand/trace/part.go index 0bd9d5cd..ee61e8c6 100644 --- a/banyand/trace/part.go +++ b/banyand/trace/part.go @@ -484,6 +484,10 @@ func (mp *memPart) mustFlush(fileSystem fs.FileSystem, path string) { mp.partMetadata.mustWriteMetadata(fileSystem, path) mp.tagType.mustWriteTagType(fileSystem, path) mp.traceIDFilter.mustWriteTraceIDFilter(fileSystem, path) + if mp.traceIDFilter.filter != nil { + releaseBloomFilter(mp.traceIDFilter.filter) + mp.traceIDFilter.filter = nil + } fileSystem.SyncPath(path) } diff --git a/banyand/trace/query.go b/banyand/trace/query.go index df81f15c..7be69cbe 100644 --- a/banyand/trace/query.go +++ b/banyand/trace/query.go @@ -24,8 +24,8 @@ import ( "sort" "github.com/pkg/errors" + "google.golang.org/protobuf/types/known/timestamppb" - databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/internal/storage" "github.com/apache/skywalking-banyandb/pkg/convert" @@ -310,17 +310,6 @@ func (qr *queryResult) merge() *model.TraceResult { return result } -func mustEncodeTagValue(name string, tagType databasev1.TagType, tagValue *modelv1.TagValue, num int) [][]byte { - values := make([][]byte, num) - tv := encodeTagValue(name, tagType, tagValue) - defer releaseTagValue(tv) - value := tv.marshal() - for i := 0; i < num; i++ { - values[i] = value - } - return values -} - func mustDecodeTagValue(valueType pbv1.ValueType, value []byte) *modelv1.TagValue { if value == nil { return pbv1.NullTagValue @@ -351,6 +340,12 @@ func mustDecodeTagValue(valueType pbv1.ValueType, value []byte) *modelv1.TagValu values = append(values, string(bb.Buf)) } return strArrTagValue(values) + case pbv1.ValueTypeTimestamp: + // Convert 64-bit nanoseconds since epoch back to protobuf timestamp + epochNanos := convert.BytesToInt64(value) + seconds := epochNanos / 1e9 + nanos := int32(epochNanos % 1e9) + return timestampTagValue(seconds, nanos) default: logger.Panicf("unsupported value type: %v", valueType) return nil @@ -406,3 +401,14 @@ func strArrTagValue(values []string) *modelv1.TagValue { }, } } + +func timestampTagValue(seconds int64, nanos int32) *modelv1.TagValue { + return &modelv1.TagValue{ + Value: &modelv1.TagValue_Timestamp{ + Timestamp: ×tamppb.Timestamp{ + Seconds: seconds, + Nanos: nanos, + }, + }, + } +} diff --git a/banyand/trace/svc_standalone.go b/banyand/trace/svc_standalone.go index a2c101bd..05e4e858 100644 --- a/banyand/trace/svc_standalone.go +++ b/banyand/trace/svc_standalone.go @@ -21,11 +21,15 @@ import ( "context" "path" "path/filepath" + "strings" "github.com/pkg/errors" + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/api/data" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + "github.com/apache/skywalking-banyandb/banyand/internal/storage" "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/banyand/protector" @@ -62,11 +66,6 @@ type standalone struct { maxFileSnapshotNum int } -// StandaloneService returns a new standalone service. -func StandaloneService(_ context.Context) (Service, error) { - return &standalone{}, nil -} - func (s *standalone) FlagSet() *run.FlagSet { fs := run.NewFlagSet("trace") fs.StringVar(&s.root, "trace-root-path", "/tmp", "the root path for trace data") @@ -93,41 +92,38 @@ func (s *standalone) Role() databasev1.Role { return databasev1.Role_ROLE_DATA } -func (s *standalone) PreRun(_ context.Context) error { - s.l = logger.GetLogger("trace") - - // Initialize metadata - if s.metadata == nil { - return errors.New("metadata repo is required") - } - - // Initialize filesystem - if s.lfs == nil { - s.lfs = fs.NewLocalFileSystem() - } - - // Initialize protector - if s.pm == nil { - return errors.New("memory protector is required") +func (s *standalone) PreRun(ctx context.Context) error { + s.l = logger.GetLogger(s.Name()) + s.l.Info().Msg("memory protector is initialized in PreRun") + s.lfs = fs.NewLocalFileSystemWithLoggerAndLimit(s.l, s.pm.GetLimit()) + path := path.Join(s.root, s.Name()) + s.snapshotDir = filepath.Join(path, storage.SnapshotsDir) + observability.UpdatePath(path) + val := ctx.Value(common.ContextNodeKey) + if val == nil { + return errors.New("node id is empty") } - - // Initialize pipeline - if s.pipeline == nil { - return errors.New("pipeline is required") - } - - // Set up data path + node := val.(common.Node) if s.dataPath == "" { - s.dataPath = path.Join(s.root, "trace-data") + s.dataPath = filepath.Join(path, storage.DataDir) } - - // Initialize schema repository - var nodeLabels map[string]string - s.schemaRepo = newSchemaRepo(s.dataPath, s, nodeLabels) + if !strings.HasPrefix(filepath.VolumeName(s.dataPath), filepath.VolumeName(path)) { + observability.UpdatePath(s.dataPath) + } + s.schemaRepo = newSchemaRepo(s.dataPath, s, node.Labels) // Initialize snapshot directory s.snapshotDir = filepath.Join(s.dataPath, "snapshot") + // Set up write callback handler + if s.pipeline != nil { + writeListener := setUpWriteCallback(s.l, &s.schemaRepo, s.maxDiskUsagePercent) + err := s.pipeline.Subscribe(data.TopicTraceWrite, writeListener) + if err != nil { + return err + } + } + s.l.Info(). Str("root", s.root). Str("dataPath", s.dataPath). @@ -138,12 +134,7 @@ func (s *standalone) PreRun(_ context.Context) error { } func (s *standalone) Serve() run.StopNotify { - // As specified in the plan, no pipeline listeners should be implemented - s.l.Info().Msg("trace standalone service started") - - // Return a channel that never closes since this service runs indefinitely - stopCh := make(chan struct{}) - return stopCh + return s.schemaRepo.StopCh() } func (s *standalone) GracefulStop() { @@ -169,36 +160,6 @@ func (s *standalone) GetRemovalSegmentsTimeRange(group string) *timestamp.TimeRa return s.schemaRepo.GetRemovalSegmentsTimeRange(group) } -// SetMetadata sets the metadata repository. -func (s *standalone) SetMetadata(metadata metadata.Repo) { - s.metadata = metadata -} - -// SetObservabilityRegistry sets the observability metrics registry. -func (s *standalone) SetObservabilityRegistry(omr observability.MetricsRegistry) { - s.omr = omr -} - -// SetProtector sets the memory protector. -func (s *standalone) SetProtector(pm protector.Memory) { - s.pm = pm -} - -// SetPipeline sets the pipeline server. -func (s *standalone) SetPipeline(pipeline queue.Server) { - s.pipeline = pipeline -} - -// SetLocalPipeline sets the local pipeline queue. -func (s *standalone) SetLocalPipeline(localPipeline queue.Queue) { - s.localPipeline = localPipeline -} - -// SetFileSystem sets the file system. -func (s *standalone) SetFileSystem(lfs fs.FileSystem) { - s.lfs = lfs -} - // NewService returns a new service. func NewService(metadata metadata.Repo, pipeline queue.Server, omr observability.MetricsRegistry, pm protector.Memory) (Service, error) { return &standalone{ diff --git a/banyand/trace/tag.go b/banyand/trace/tag.go index 1510c997..bd576ece 100644 --- a/banyand/trace/tag.go +++ b/banyand/trace/tag.go @@ -18,70 +18,12 @@ package trace import ( + internalencoding "github.com/apache/skywalking-banyandb/banyand/internal/encoding" "github.com/apache/skywalking-banyandb/pkg/bytes" - "github.com/apache/skywalking-banyandb/pkg/convert" - "github.com/apache/skywalking-banyandb/pkg/encoding" + pkgencoding "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" - "github.com/apache/skywalking-banyandb/pkg/pool" -) - -func generateInt64Slice(length int) *[]int64 { - v := int64SlicePool.Get() - if v == nil { - s := make([]int64, length) - return &s - } - if cap(*v) < length { - *v = make([]int64, length) - } else { - *v = (*v)[:length] - } - return v -} - -func releaseInt64Slice(int64Slice *[]int64) { - *int64Slice = (*int64Slice)[:0] - int64SlicePool.Put(int64Slice) -} - -func generateFloat64Slice(length int) *[]float64 { - v := float64SlicePool.Get() - if v == nil { - s := make([]float64, length) - return &s - } - if cap(*v) < length { - *v = make([]float64, length) - } else { - *v = (*v)[:length] - } - return v -} - -func releaseFloat64Slice(float64Slice *[]float64) { - *float64Slice = (*float64Slice)[:0] - float64SlicePool.Put(float64Slice) -} - -func generateDictionary() *encoding.Dictionary { - v := dictionaryPool.Get() - if v == nil { - return &encoding.Dictionary{} - } - return v -} - -func releaseDictionary(d *encoding.Dictionary) { - d.Reset() - dictionaryPool.Put(d) -} - -var ( - int64SlicePool = pool.Register[*[]int64]("trace-int64Slice") - float64SlicePool = pool.Register[*[]float64]("trace-float64Slice") - dictionaryPool = pool.Register[*encoding.Dictionary]("trace-dictionary") ) type tag struct { @@ -116,18 +58,14 @@ func (t *tag) mustWriteTo(tm *tagMetadata, tagWriter *writer) { tm.name = t.name tm.valueType = t.valueType + // Use shared encoding module bb := bigValuePool.Generate() defer bigValuePool.Release(bb) - - // select encoding based on data type - switch t.valueType { - case pbv1.ValueTypeInt64: - t.encodeInt64Tag(bb) - case pbv1.ValueTypeFloat64: - t.encodeFloat64Tag(bb) - default: - t.encodeDefault(bb) + err := internalencoding.EncodeTagValues(bb, t.values, t.valueType) + if err != nil { + logger.Panicf("failed to encode tag values: %v", err) } + tm.size = uint64(len(bb.Buf)) if tm.size > maxValuesBlockSize { logger.Panicf("too large valuesSize: %d bytes; mustn't exceed %d bytes", tm.size, maxValuesBlockSize) @@ -136,123 +74,37 @@ func (t *tag) mustWriteTo(tm *tagMetadata, tagWriter *writer) { tagWriter.MustWrite(bb.Buf) } -func (t *tag) encodeInt64Tag(bb *bytes.Buffer) { - // convert byte array to int64 array - intValuesPtr := generateInt64Slice(len(t.values)) - intValues := *intValuesPtr - defer releaseInt64Slice(intValuesPtr) - var encodeType encoding.EncodeType - - for i, v := range t.values { - if v == nil || string(v) == "null" { - t.encodeDefault(bb) - encodeType = encoding.EncodeTypePlain - // Prepend encodeType (1 byte) to the beginning - bb.Buf = append([]byte{byte(encodeType)}, bb.Buf...) - return - } - if len(v) != 8 { - logger.Panicf("invalid value length at index %d: expected 8 bytes, got %d", i, len(v)) - } - intValues[i] = convert.BytesToInt64(v) - } - // use delta encoding for integer column - var firstValue int64 - bb.Buf, encodeType, firstValue = encoding.Int64ListToBytes(bb.Buf[:0], intValues) - if encodeType == encoding.EncodeTypeUnknown { - logger.Panicf("invalid encode type for int64 values") - } - firstValueBytes := convert.Int64ToBytes(firstValue) - // Prepend encodeType (1 byte) and firstValue (8 bytes) to the beginning - bb.Buf = append( - append([]byte{byte(encodeType)}, firstValueBytes...), - bb.Buf..., - ) -} - -func (t *tag) encodeFloat64Tag(bb *bytes.Buffer) { - // convert byte array to float64 array - intValuesPtr := generateInt64Slice(len(t.values)) - intValues := *intValuesPtr - defer releaseInt64Slice(intValuesPtr) - - floatValuesPtr := generateFloat64Slice(len(t.values)) - floatValues := *floatValuesPtr - defer releaseFloat64Slice(floatValuesPtr) - - var encodeType encoding.EncodeType - - doEncodeDefault := func() { - t.encodeDefault(bb) - encodeType = encoding.EncodeTypePlain - // Prepend encodeType (1 byte) to the beginning - bb.Buf = append([]byte{byte(encodeType)}, bb.Buf...) - } - - for i, v := range t.values { - if v == nil || string(v) == "null" { - doEncodeDefault() - return - } - if len(v) != 8 { - logger.Panicf("invalid value length at index %d: expected 8 bytes, got %d", i, len(v)) - } - floatValues[i] = convert.BytesToFloat64(v) - } - intValues, exp, err := encoding.Float64ListToDecimalIntList(intValues[:0], floatValues) - if err != nil { - logger.Errorf("cannot convert Float64List to DecimalIntList : %v", err) - doEncodeDefault() - return - } - var firstValue int64 - bb.Buf, encodeType, firstValue = encoding.Int64ListToBytes(bb.Buf[:0], intValues) - if encodeType == encoding.EncodeTypeUnknown { - logger.Panicf("invalid encode type for int64 values") - } - firstValueBytes := convert.Int64ToBytes(firstValue) - expBytes := convert.Int16ToBytes(exp) - // Prepend encodeType (1 byte), exp (2 bytes) and firstValue (8 bytes) to the beginning - bb.Buf = append( - append(append([]byte{byte(encodeType)}, expBytes...), firstValueBytes...), - bb.Buf..., - ) -} - -func (t *tag) encodeDefault(bb *bytes.Buffer) { - dict := generateDictionary() - defer releaseDictionary(dict) - for _, v := range t.values { - if !dict.Add(v) { - bb.Buf = encoding.EncodeBytesBlock(bb.Buf[:0], t.values) - bb.Buf = append([]byte{byte(encoding.EncodeTypePlain)}, bb.Buf...) - return - } - } - bb.Buf = dict.Encode(bb.Buf[:0]) - bb.Buf = append([]byte{byte(encoding.EncodeTypeDictionary)}, bb.Buf...) -} - -func (t *tag) mustReadValues(decoder *encoding.BytesBlockDecoder, reader fs.Reader, cm tagMetadata, count uint64) { +func (t *tag) mustReadValues(decoder *pkgencoding.BytesBlockDecoder, reader fs.Reader, cm tagMetadata, count uint64) { + t.name = cm.name + t.valueType = cm.valueType if t.valueType == pbv1.ValueTypeUnknown { - for i := uint64(0); i < count; i++ { + for range count { t.values = append(t.values, nil) } return } - bb := bigValuePool.Generate() - defer bigValuePool.Release(bb) valuesSize := cm.size if valuesSize > maxValuesBlockSize { logger.Panicf("%s: block size cannot exceed %d bytes; got %d bytes", reader.Path(), maxValuesBlockSize, valuesSize) } - bb.Buf = bytes.ResizeOver(bb.Buf, int(valuesSize)) + + bb := bigValuePool.Generate() + defer bigValuePool.Release(bb) + bb.Buf = bytes.ResizeOver(bb.Buf[:0], int(valuesSize)) fs.MustReadData(reader, int64(cm.offset), bb.Buf) - t.decodeTagValues(decoder, reader.Path(), count, bb) + + // Use shared decoding module + var err error + t.values, err = internalencoding.DecodeTagValues(t.values, decoder, bb, t.valueType, int(count)) + if err != nil { + logger.Panicf("%s: failed to decode tag values: %v", reader.Path(), err) + } } -func (t *tag) mustSeqReadValues(decoder *encoding.BytesBlockDecoder, reader *seqReader, cm tagMetadata, count uint64) { +func (t *tag) mustSeqReadValues(decoder *pkgencoding.BytesBlockDecoder, reader *seqReader, cm tagMetadata, count uint64) { + t.name = cm.name + t.valueType = cm.valueType if cm.offset != reader.bytesRead { logger.Panicf("%s: offset mismatch: %d vs %d", reader.Path(), cm.offset, reader.bytesRead) } @@ -263,114 +115,14 @@ func (t *tag) mustSeqReadValues(decoder *encoding.BytesBlockDecoder, reader *seq bb := bigValuePool.Generate() defer bigValuePool.Release(bb) - - bb.Buf = bytes.ResizeOver(bb.Buf, int(valuesSize)) + bb.Buf = bytes.ResizeOver(bb.Buf[:0], int(valuesSize)) reader.mustReadFull(bb.Buf) - t.decodeTagValues(decoder, reader.Path(), count, bb) -} - -func (t *tag) decodeTagValues(decoder *encoding.BytesBlockDecoder, path string, count uint64, bb *bytes.Buffer) { - switch t.valueType { - case pbv1.ValueTypeInt64: - t.decodeInt64Tag(decoder, path, count, bb) - case pbv1.ValueTypeFloat64: - t.decodeFloat64Tag(decoder, path, count, bb) - default: - t.decodeDefault(decoder, bb, count, path) - } -} - -func (t *tag) decodeInt64Tag(decoder *encoding.BytesBlockDecoder, path string, count uint64, bb *bytes.Buffer) { - // decode integer type - intValuesPtr := generateInt64Slice(int(count)) - intValues := *intValuesPtr - defer releaseInt64Slice(intValuesPtr) - - if len(bb.Buf) < 1 { - logger.Panicf("bb.Buf length too short: expect at least %d bytes, but got %d bytes", 1, len(bb.Buf)) - } - encodeType := encoding.EncodeType(bb.Buf[0]) - if encodeType == encoding.EncodeTypePlain { - bb.Buf = bb.Buf[1:] - t.decodeDefault(decoder, bb, count, path) - return - } - - const expectedLen = 9 - if len(bb.Buf) < expectedLen { - logger.Panicf("bb.Buf length too short: expect at least %d bytes, but got %d bytes", expectedLen, len(bb.Buf)) - } - firstValue := convert.BytesToInt64(bb.Buf[1:9]) - bb.Buf = bb.Buf[9:] - var err error - intValues, err = encoding.BytesToInt64List(intValues[:0], bb.Buf, encodeType, firstValue, int(count)) - if err != nil { - logger.Panicf("%s: cannot decode int values: %v", path, err) - } - // convert int64 array to byte array - t.values = make([][]byte, count) - for i, v := range intValues { - t.values[i] = convert.Int64ToBytes(v) - } -} - -func (t *tag) decodeFloat64Tag(decoder *encoding.BytesBlockDecoder, path string, count uint64, bb *bytes.Buffer) { - // decode float type - intValuesPtr := generateInt64Slice(int(count)) - intValues := *intValuesPtr - defer releaseInt64Slice(intValuesPtr) - floatValuesPtr := generateFloat64Slice(int(count)) - floatValues := *floatValuesPtr - defer releaseFloat64Slice(floatValuesPtr) - - if len(bb.Buf) < 1 { - logger.Panicf("bb.Buf length too short: expect at least %d bytes, but got %d bytes", 1, len(bb.Buf)) - } - encodeType := encoding.EncodeType(bb.Buf[0]) - if encodeType == encoding.EncodeTypePlain { - bb.Buf = bb.Buf[1:] - t.decodeDefault(decoder, bb, count, path) - return - } - const expectedLen = 11 - if len(bb.Buf) < expectedLen { - logger.Panicf("bb.Buf length too short: expect at least %d bytes, but got %d bytes", expectedLen, len(bb.Buf)) - } - exp := convert.BytesToInt16(bb.Buf[1:3]) - firstValue := convert.BytesToInt64(bb.Buf[3:11]) - bb.Buf = bb.Buf[11:] + // Use shared decoding module var err error - intValues, err = encoding.BytesToInt64List(intValues[:0], bb.Buf, encodeType, firstValue, int(count)) - if err != nil { - logger.Panicf("%s: cannot decode int values: %v", path, err) - } - floatValues, err = encoding.DecimalIntListToFloat64List(floatValues[:0], intValues, exp, int(count)) - if err != nil { - logger.Panicf("cannot convert DecimalIntList to Float64List: %v", err) - } - if uint64(len(floatValues)) != count { - logger.Panicf("unexpected floatValues length: got %d, expected %d", len(floatValues), count) - } - // convert float64 array to byte array - t.values = make([][]byte, count) - for i, v := range floatValues { - t.values[i] = convert.Float64ToBytes(v) - } -} - -func (t *tag) decodeDefault(decoder *encoding.BytesBlockDecoder, bb *bytes.Buffer, count uint64, path string) { - encodeType := encoding.EncodeType(bb.Buf[0]) - var err error - if encodeType == encoding.EncodeTypeDictionary { - dict := generateDictionary() - defer releaseDictionary(dict) - t.values, err = dict.Decode(t.values[:0], bb.Buf[1:], count) - } else { - t.values, err = decoder.Decode(t.values[:0], bb.Buf[1:], count) - } + t.values, err = internalencoding.DecodeTagValues(t.values, decoder, bb, t.valueType, int(count)) if err != nil { - logger.Panicf("%s: cannot decode values: %v", path, err) + logger.Panicf("%s: failed to decode tag values: %v", reader.Path(), err) } } diff --git a/banyand/trace/tag_test.go b/banyand/trace/tag_test.go index 7d20eda5..52376c87 100644 --- a/banyand/trace/tag_test.go +++ b/banyand/trace/tag_test.go @@ -18,223 +18,158 @@ package trace import ( - "fmt" "testing" + "time" "github.com/stretchr/testify/assert" - "github.com/apache/skywalking-banyandb/pkg/bytes" + "github.com/apache/skywalking-banyandb/banyand/internal/encoding" + pkgbytes "github.com/apache/skywalking-banyandb/pkg/bytes" "github.com/apache/skywalking-banyandb/pkg/convert" - "github.com/apache/skywalking-banyandb/pkg/encoding" + pkgencoding "github.com/apache/skywalking-banyandb/pkg/encoding" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" ) -func TestTag_reset(t *testing.T) { - tt := &tag{ - name: "test", - valueType: pbv1.ValueTypeStr, - values: [][]byte{[]byte("value1"), []byte("value2")}, - } - - tt.reset() - - assert.Equal(t, "", tt.name) - assert.Equal(t, 0, len(tt.values)) -} - -func TestTag_resizeValues(t *testing.T) { - tt := &tag{ - values: make([][]byte, 2, 5), - } - - values := tt.resizeValues(3) - assert.Equal(t, 3, len(values)) - assert.Equal(t, 5, cap(values)) - - values = tt.resizeValues(6) - assert.Equal(t, 6, len(values)) - assert.True(t, cap(values) >= 6) // The capacity is at least 6, but could be more +func timeToBytes(t time.Time) []byte { + return pkgencoding.Int64ToBytes(nil, t.UnixNano()) } -func TestTag_mustWriteTo_mustReadValues(t *testing.T) { - tests := []struct { - tag *tag - name string - }{ - { - name: "string with nils", - tag: &tag{ - name: "test", - valueType: pbv1.ValueTypeStr, - values: [][]byte{[]byte("value1"), nil, []byte("value2"), nil}, +func TestTagEncodingDecoding(t *testing.T) { + t.Run("test int64 tag encoding/decoding", func(t *testing.T) { + tag := &tag{ + name: "test_int64", + valueType: pbv1.ValueTypeInt64, + values: [][]byte{ + convert.Int64ToBytes(100), + convert.Int64ToBytes(200), + convert.Int64ToBytes(300), }, - }, - { - name: "int64 with null", - tag: &tag{ - name: "test", - valueType: pbv1.ValueTypeInt64, - values: [][]byte{[]byte("null"), nil, []byte("null"), nil}, + } + + // Test encoding + bb := &pkgbytes.Buffer{} + err := encoding.EncodeTagValues(bb, tag.values, tag.valueType) + assert.NoError(t, err) + assert.NotNil(t, bb.Buf) + assert.Greater(t, len(bb.Buf), 0) + + // Test decoding + decoder := &pkgencoding.BytesBlockDecoder{} + decodedValues, err := encoding.DecodeTagValues(nil, decoder, bb, tag.valueType, len(tag.values)) + assert.NoError(t, err) + assert.Equal(t, len(tag.values), len(decodedValues)) + assert.Equal(t, tag.values, decodedValues) + }) + + t.Run("test float64 tag encoding/decoding", func(t *testing.T) { + tag := &tag{ + name: "test_float64", + valueType: pbv1.ValueTypeFloat64, + values: [][]byte{ + convert.Float64ToBytes(1.5), + convert.Float64ToBytes(2.5), + convert.Float64ToBytes(3.5), }, - }, - { - name: "valid int64 values", - tag: &tag{ - name: "test", - valueType: pbv1.ValueTypeInt64, - values: [][]byte{ - convert.Int64ToBytes(1), - convert.Int64ToBytes(2), - convert.Int64ToBytes(4), - convert.Int64ToBytes(5), - }, + } + + // Test encoding + bb := &pkgbytes.Buffer{} + err := encoding.EncodeTagValues(bb, tag.values, tag.valueType) + assert.NoError(t, err) + assert.NotNil(t, bb.Buf) + assert.Greater(t, len(bb.Buf), 0) + + // Test decoding + decoder := &pkgencoding.BytesBlockDecoder{} + decodedValues, err := encoding.DecodeTagValues(nil, decoder, bb, tag.valueType, len(tag.values)) + assert.NoError(t, err) + assert.Equal(t, len(tag.values), len(decodedValues)) + assert.Equal(t, tag.values, decodedValues) + }) + + t.Run("test string tag encoding/decoding", func(t *testing.T) { + tag := &tag{ + name: "test_string", + valueType: pbv1.ValueTypeStr, + values: [][]byte{ + []byte("value1"), + []byte("value2"), + []byte("value3"), }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - tm := &tagMetadata{} - - buf, filterBuf := &bytes.Buffer{}, &bytes.Buffer{} - w, fw := &writer{}, &writer{} - w.init(buf) - fw.init(filterBuf) - - tt.tag.mustWriteTo(tm, w) - assert.Equal(t, w.bytesWritten, tm.size) - assert.Equal(t, uint64(len(buf.Buf)), tm.size) - assert.Equal(t, uint64(0), tm.offset) - assert.Equal(t, tt.tag.name, tm.name) - assert.Equal(t, tt.tag.valueType, tm.valueType) - - decoder := &encoding.BytesBlockDecoder{} - unmarshaled := &tag{} - unmarshaled.name = tm.name - unmarshaled.valueType = tm.valueType - unmarshaled.mustReadValues(decoder, buf, *tm, uint64(len(tt.tag.values))) - assert.Equal(t, tt.tag.values, unmarshaled.values) - }) - } -} - -func TestTag_HighCardinalityStringEncoding(t *testing.T) { - tests := []struct { - name string - description string - expectedEncType encoding.EncodeType - uniqueCount int - totalCount int - }{ - { - name: "exactly 256 unique values - should use dictionary", - uniqueCount: 256, - totalCount: 256, - expectedEncType: encoding.EncodeTypeDictionary, - description: "Dictionary encoding should be used when exactly at the threshold", - }, - { - name: "257 unique values - should use plain encoding", - uniqueCount: 257, - totalCount: 257, - expectedEncType: encoding.EncodeTypePlain, - description: "Plain encoding should be used when exceeding dictionary threshold", - }, - { - name: "300 unique values - should use plain encoding", - uniqueCount: 300, - totalCount: 300, - expectedEncType: encoding.EncodeTypePlain, - description: "Plain encoding should be used for high cardinality strings", - }, - { - name: "1000 unique values - should use plain encoding", - uniqueCount: 1000, - totalCount: 1000, - expectedEncType: encoding.EncodeTypePlain, - description: "Plain encoding should be used for very high cardinality", - }, - { - name: "500 total with 200 unique - should use dictionary", - uniqueCount: 200, - totalCount: 500, - expectedEncType: encoding.EncodeTypeDictionary, - description: "Dictionary should be used when unique count is below threshold despite high total count", - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - // Generate unique string values - values := make([][]byte, tt.totalCount) - - // Create unique values up to uniqueCount - for i := 0; i < tt.uniqueCount; i++ { - values[i] = []byte(fmt.Sprintf("unique_value_%06d", i)) - } - - // If totalCount > uniqueCount, repeat some values to reach totalCount - for i := tt.uniqueCount; i < tt.totalCount; i++ { - // Repeat values cyclically - repeatIndex := i % tt.uniqueCount - values[i] = []byte(fmt.Sprintf("unique_value_%06d", repeatIndex)) - } - - testTag := &tag{ - name: "high_cardinality_tag", - valueType: pbv1.ValueTypeStr, - values: values, - } - - // Encode the tag - tm := &tagMetadata{} - buf, filterBuf := &bytes.Buffer{}, &bytes.Buffer{} - w, fw := &writer{}, &writer{} - w.init(buf) - fw.init(filterBuf) - - testTag.mustWriteTo(tm, w) - - // Verify basic metadata - assert.Equal(t, w.bytesWritten, tm.size) - assert.Equal(t, uint64(len(buf.Buf)), tm.size) - assert.Equal(t, uint64(0), tm.offset) - assert.Equal(t, testTag.name, tm.name) - assert.Equal(t, testTag.valueType, tm.valueType) - - // Check encoding type by examining the first byte of the encoded data - assert.True(t, len(buf.Buf) > 0, "Encoded buffer should not be empty") - actualEncType := encoding.EncodeType(buf.Buf[0]) - assert.Equal(t, tt.expectedEncType, actualEncType, - "Expected %s encoding (%d), got %d. %s", - getEncodeTypeName(tt.expectedEncType), tt.expectedEncType, actualEncType, tt.description) - - // Test roundtrip: decode and verify all values are preserved - decoder := &encoding.BytesBlockDecoder{} - unmarshaled := &tag{} - unmarshaled.name = tm.name - unmarshaled.valueType = tm.valueType - unmarshaled.mustReadValues(decoder, buf, *tm, uint64(len(testTag.values))) - - assert.Equal(t, len(testTag.values), len(unmarshaled.values), "Number of values should match") - - // Verify all values are correctly decoded - for i, originalValue := range testTag.values { - assert.Equal(t, originalValue, unmarshaled.values[i], - "Value at index %d should match original", i) - } - }) - } -} - -// Helper function to get encode type name for better test output. -func getEncodeTypeName(encType encoding.EncodeType) string { - switch encType { - case encoding.EncodeTypePlain: - return "Plain" - case encoding.EncodeTypeDictionary: - return "Dictionary" - default: - return fmt.Sprintf("Unknown(%d)", encType) - } + } + + // Test encoding + bb := &pkgbytes.Buffer{} + err := encoding.EncodeTagValues(bb, tag.values, tag.valueType) + assert.NoError(t, err) + assert.NotNil(t, bb.Buf) + assert.Greater(t, len(bb.Buf), 0) + + // Test decoding + decoder := &pkgencoding.BytesBlockDecoder{} + decodedValues, err := encoding.DecodeTagValues(nil, decoder, bb, tag.valueType, len(tag.values)) + assert.NoError(t, err) + assert.Equal(t, len(tag.values), len(decodedValues)) + assert.Equal(t, tag.values, decodedValues) + }) + + t.Run("test timestamp tag encoding/decoding", func(t *testing.T) { + // Create test timestamps + time1 := time.Date(2023, 1, 1, 12, 0, 0, 0, time.UTC) + time2 := time.Date(2023, 1, 2, 12, 0, 0, 0, time.UTC) + time3 := time.Date(2023, 1, 3, 12, 0, 0, 0, time.UTC) + + tag := &tag{ + name: "test_timestamp", + valueType: pbv1.ValueTypeTimestamp, + values: [][]byte{ + timeToBytes(time1), + timeToBytes(time2), + timeToBytes(time3), + }, + } + + // Test encoding + bb := &pkgbytes.Buffer{} + err := encoding.EncodeTagValues(bb, tag.values, tag.valueType) + assert.NoError(t, err) + assert.NotNil(t, bb.Buf) + assert.Greater(t, len(bb.Buf), 0) + + // Test decoding + decoder := &pkgencoding.BytesBlockDecoder{} + decodedValues, err := encoding.DecodeTagValues(nil, decoder, bb, tag.valueType, len(tag.values)) + assert.NoError(t, err) + assert.Equal(t, len(tag.values), len(decodedValues)) + assert.Equal(t, tag.values, decodedValues) + + // Verify the decoded timestamp values can be converted back to time.Time + for i, decodedValue := range decodedValues { + expectedTime := []time.Time{time1, time2, time3}[i] + decodedNanos := pkgencoding.BytesToInt64(decodedValue) + decodedTime := time.Unix(0, decodedNanos) + assert.Equal(t, expectedTime.Unix(), decodedTime.Unix()) + // Note: We compare Unix time (seconds) since nanoseconds might have precision differences + } + }) + + t.Run("test empty values", func(t *testing.T) { + tag := &tag{ + name: "test_empty", + valueType: pbv1.ValueTypeStr, + values: [][]byte{}, + } + + // Test encoding + bb := &pkgbytes.Buffer{} + err := encoding.EncodeTagValues(bb, tag.values, tag.valueType) + assert.NoError(t, err) + assert.Nil(t, bb.Buf) + + // Test decoding + decoder := &pkgencoding.BytesBlockDecoder{} + decodedValues, err := encoding.DecodeTagValues(nil, decoder, bb, tag.valueType, 0) + assert.NoError(t, err) + assert.Nil(t, decodedValues) + }) } diff --git a/banyand/trace/timestamp_test.go b/banyand/trace/timestamp_test.go new file mode 100644 index 00000000..b35a7f33 --- /dev/null +++ b/banyand/trace/timestamp_test.go @@ -0,0 +1,123 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package trace + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/types/known/timestamppb" + + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/pkg/convert" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" +) + +func TestTimestampTagValueEncodingDecoding(t *testing.T) { + // Test timestamp encoding + t.Run("test timestamp tag encoding", func(t *testing.T) { + // Create a test timestamp + testTime := time.Date(2023, 1, 1, 12, 0, 0, 123456789, time.UTC) + timestampProto := timestamppb.New(testTime) + + tagValue := &modelv1.TagValue{ + Value: &modelv1.TagValue_Timestamp{ + Timestamp: timestampProto, + }, + } + + // Encode the timestamp + encoded := encodeTagValue("test_timestamp", databasev1.TagType_TAG_TYPE_TIMESTAMP, tagValue) + + // Verify encoding + assert.Equal(t, pbv1.ValueTypeTimestamp, encoded.valueType) + assert.Equal(t, "test_timestamp", encoded.tag) + assert.NotNil(t, encoded.value) + + // The encoded value should be 8 bytes (int64 nanoseconds) + assert.Equal(t, 8, len(encoded.value)) + + // Verify the encoded value represents the correct nanoseconds + expectedNanos := testTime.UnixNano() + // Use the same conversion function to decode + decodedNanos := convert.BytesToInt64(encoded.value) + assert.Equal(t, expectedNanos, decodedNanos) + }) + + // Test timestamp decoding + t.Run("test timestamp tag decoding", func(t *testing.T) { + // Create test timestamp + testTime := time.Date(2023, 1, 1, 12, 0, 0, 123456789, time.UTC) + expectedNanos := testTime.UnixNano() + + // Create encoded bytes using the same conversion function + encodedBytes := convert.Int64ToBytes(expectedNanos) + + // Decode the timestamp + decoded := mustDecodeTagValue(pbv1.ValueTypeTimestamp, encodedBytes) + + // Verify decoding + assert.NotNil(t, decoded) + assert.NotNil(t, decoded.GetTimestamp()) + + decodedTime := decoded.GetTimestamp() + assert.Equal(t, testTime.Unix(), decodedTime.Seconds) + assert.Equal(t, int32(123456789), decodedTime.Nanos) + }) + + // Test round-trip encoding and decoding + t.Run("test timestamp round-trip", func(t *testing.T) { + // Create a test timestamp + testTime := time.Date(2023, 1, 1, 12, 0, 0, 123456789, time.UTC) + timestampProto := timestamppb.New(testTime) + + tagValue := &modelv1.TagValue{ + Value: &modelv1.TagValue_Timestamp{ + Timestamp: timestampProto, + }, + } + + // Encode + encoded := encodeTagValue("test_timestamp", databasev1.TagType_TAG_TYPE_TIMESTAMP, tagValue) + + // Decode + decoded := mustDecodeTagValue(pbv1.ValueTypeTimestamp, encoded.value) + + // Verify round-trip + assert.NotNil(t, decoded) + assert.NotNil(t, decoded.GetTimestamp()) + + decodedTime := decoded.GetTimestamp() + assert.Equal(t, testTime.Unix(), decodedTime.Seconds) + assert.Equal(t, int32(123456789), decodedTime.Nanos) + }) + + // Test nil timestamp handling + t.Run("test nil timestamp handling", func(t *testing.T) { + // Test encoding nil timestamp + encoded := encodeTagValue("test_timestamp", databasev1.TagType_TAG_TYPE_TIMESTAMP, nil) + assert.Equal(t, pbv1.ValueTypeTimestamp, encoded.valueType) + assert.Nil(t, encoded.value) + + // Test decoding nil value + decoded := mustDecodeTagValue(pbv1.ValueTypeTimestamp, nil) + assert.Equal(t, pbv1.NullTagValue, decoded) + }) +} diff --git a/banyand/trace/trace.go b/banyand/trace/trace.go index b8812ee0..ed33c4f0 100644 --- a/banyand/trace/trace.go +++ b/banyand/trace/trace.go @@ -48,12 +48,11 @@ const ( var traceScope = observability.RootScope.SubScope("trace") type option struct { - mergePolicy *mergePolicy - protector protector.Memory - tire2Client queue.Client - seriesCacheMaxSize run.Bytes - flushTimeout time.Duration - elementIndexFlushTimeout time.Duration + mergePolicy *mergePolicy + protector protector.Memory + tire2Client queue.Client + seriesCacheMaxSize run.Bytes + flushTimeout time.Duration } // Service allows inspecting the trace data. diff --git a/banyand/trace/trace_suite_test.go b/banyand/trace/trace_suite_test.go index b64c502e..05140094 100644 --- a/banyand/trace/trace_suite_test.go +++ b/banyand/trace/trace_suite_test.go @@ -33,7 +33,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/test" "github.com/apache/skywalking-banyandb/pkg/test/flags" - teststream "github.com/apache/skywalking-banyandb/pkg/test/stream" + testtrace "github.com/apache/skywalking-banyandb/pkg/test/trace" ) func TestTrace(t *testing.T) { @@ -57,7 +57,7 @@ func (p *preloadTraceService) Name() string { } func (p *preloadTraceService) PreRun(ctx context.Context) error { - return teststream.PreloadSchema(ctx, p.metaSvc.SchemaRegistry()) + return testtrace.PreloadSchema(ctx, p.metaSvc.SchemaRegistry()) } type services struct { diff --git a/banyand/trace/traces.go b/banyand/trace/traces.go index b166a9c3..93361ce2 100644 --- a/banyand/trace/traces.go +++ b/banyand/trace/traces.go @@ -157,7 +157,7 @@ func (t *traces) reset() { t.timestamps = t.timestamps[:0] for i := range t.tags { for j := range t.tags[i] { - t.tags[i][j].reset() + releaseTagValue(t.tags[i][j]) } } t.tags = t.tags[:0] diff --git a/banyand/trace/write_standalone.go b/banyand/trace/write_standalone.go index 5825983e..1ed6f464 100644 --- a/banyand/trace/write_standalone.go +++ b/banyand/trace/write_standalone.go @@ -186,7 +186,7 @@ func processTraces(schemaRepo *schemaRepo, traces *traces, writeEvent *tracev1.I } is := stm.indexSchema.Load().(indexSchema) - if len(is.indexRuleLocators) != len(stm.GetSchema().GetTags()) { + if len(is.indexRuleLocators) > len(stm.GetSchema().GetTags()) { return fmt.Errorf("metadata crashed, tag rule length %d, tag length %d", len(is.indexRuleLocators), len(stm.GetSchema().GetTags())) } @@ -302,6 +302,14 @@ func encodeTagValue(name string, tagType databasev1.TagType, tagVal *modelv1.Tag for i := range tagVal.GetStrArray().Value { tv.valueArr[i] = []byte(tagVal.GetStrArray().Value[i]) } + case databasev1.TagType_TAG_TYPE_TIMESTAMP: + tv.valueType = pbv1.ValueTypeTimestamp + if tagVal.GetTimestamp() != nil { + // Convert timestamp to 64-bit nanoseconds since epoch for efficient storage + ts := tagVal.GetTimestamp() + epochNanos := ts.Seconds*1e9 + int64(ts.Nanos) + tv.value = convert.Int64ToBytes(epochNanos) + } default: logger.Panicf("unsupported tag value type: %T", tagVal.GetValue()) } diff --git a/pkg/cmdsetup/standalone.go b/pkg/cmdsetup/standalone.go index 90b9293e..fb6c2028 100644 --- a/pkg/cmdsetup/standalone.go +++ b/pkg/cmdsetup/standalone.go @@ -35,6 +35,7 @@ import ( "github.com/apache/skywalking-banyandb/banyand/query" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/banyand/stream" + "github.com/apache/skywalking-banyandb/banyand/trace" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/run" "github.com/apache/skywalking-banyandb/pkg/version" @@ -58,6 +59,10 @@ func newStandaloneCmd(runners ...run.Unit) *cobra.Command { if err != nil { l.Fatal().Err(err).Msg("failed to initiate stream service") } + traceSvc, err := trace.NewService(metaSvc, dataPipeline, metricSvc, pm) + if err != nil { + l.Fatal().Err(err).Msg("failed to initiate trace service") + } var srvMetrics *grpcprom.ServerMetrics srvMetrics.UnaryServerInterceptor() srvMetrics.UnaryServerInterceptor() @@ -88,6 +93,7 @@ func newStandaloneCmd(runners ...run.Unit) *cobra.Command { propertySvc, measureSvc, streamSvc, + traceSvc, q, grpcServer, httpServer, diff --git a/pkg/pb/v1/value.go b/pkg/pb/v1/value.go index f13fc161..9d2eea50 100644 --- a/pkg/pb/v1/value.go +++ b/pkg/pb/v1/value.go @@ -23,6 +23,7 @@ import ( "strconv" "github.com/pkg/errors" + "google.golang.org/protobuf/types/known/timestamppb" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" @@ -43,6 +44,7 @@ const ( ValueTypeBinaryData ValueTypeStrArr ValueTypeInt64Arr + ValueTypeTimestamp ) // MustTagValueToValueType converts modelv1.TagValue to ValueType. @@ -60,6 +62,8 @@ func MustTagValueToValueType(tag *modelv1.TagValue) ValueType { return ValueTypeStrArr case *modelv1.TagValue_IntArray: return ValueTypeInt64Arr + case *modelv1.TagValue_Timestamp: + return ValueTypeTimestamp default: panic("unknown tag value type") } @@ -78,6 +82,8 @@ func MustTagValueSpecToValueType(tag databasev1.TagType) ValueType { return ValueTypeStrArr case databasev1.TagType_TAG_TYPE_INT_ARRAY: return ValueTypeInt64Arr + case databasev1.TagType_TAG_TYPE_TIMESTAMP: + return ValueTypeTimestamp default: panic("unknown tag value type") } @@ -92,6 +98,8 @@ func MustTagValueToStr(tag *modelv1.TagValue) string { return strconv.FormatInt(tag.GetInt().Value, 10) case *modelv1.TagValue_BinaryData: return fmt.Sprintf("%x", tag.GetBinaryData()) + case *modelv1.TagValue_Timestamp: + return tag.GetTimestamp().String() default: panic("unknown tag value type") } @@ -135,6 +143,12 @@ func marshalTagValue(dest []byte, tv *modelv1.TagValue) ([]byte, error) { dest = marshalEntityValue(dest, encoding.Int64ToBytes(nil, tv.GetInt().Value)) case *modelv1.TagValue_BinaryData: dest = marshalEntityValue(dest, tv.GetBinaryData()) + case *modelv1.TagValue_Timestamp: + // Convert timestamp to 64-bit nanoseconds since epoch for efficient storage + ts := tv.GetTimestamp() + epochNanos := ts.Seconds*1e9 + int64(ts.Nanos) + tsBytes := encoding.Int64ToBytes(nil, epochNanos) + dest = marshalEntityValue(dest, tsBytes) default: return nil, errors.New("unsupported tag value type: " + tv.String()) } @@ -198,6 +212,25 @@ func unmarshalTagValue(dest []byte, src []byte) ([]byte, []byte, *modelv1.TagVal BinaryData: data, }, }, nil + case ValueTypeTimestamp: + if dest, src, err = unmarshalEntityValue(dest, src[1:]); err != nil { + return nil, nil, nil, errors.WithMessage(err, "unmarshal timestamp tag value") + } + // Unmarshal 64-bit epoch nanoseconds and convert back to seconds + nanos + if len(dest) < 8 { // Need at least 8 bytes for the 64-bit value + return nil, src, nil, errors.New("insufficient bytes for timestamp") + } + epochNanos := encoding.BytesToInt64(dest) + seconds := epochNanos / 1e9 + nanos := int32(epochNanos % 1e9) + return dest, src, &modelv1.TagValue{ + Value: &modelv1.TagValue_Timestamp{ + Timestamp: ×tamppb.Timestamp{ + Seconds: seconds, + Nanos: nanos, + }, + }, + }, nil default: return nil, src, nil, fmt.Errorf("unsupported tag value type %d, tag value: %s", vt, src) } @@ -286,6 +319,15 @@ func MustCompareTagValue(tv1, tv2 *modelv1.TagValue) int { return int(tv1.GetInt().Value - tv2.GetInt().Value) case ValueTypeBinaryData: return bytes.Compare(tv1.GetBinaryData(), tv2.GetBinaryData()) + case ValueTypeTimestamp: + ts1 := tv1.GetTimestamp() + ts2 := tv2.GetTimestamp() + if ts1.Seconds < ts2.Seconds { + return -1 + } else if ts1.Seconds > ts2.Seconds { + return 1 + } + return 0 default: logger.Panicf("unsupported tag value type: %v", vt1) return 0 diff --git a/pkg/pb/v1/value_test.go b/pkg/pb/v1/value_test.go index 710fb2cf..6eeae75d 100644 --- a/pkg/pb/v1/value_test.go +++ b/pkg/pb/v1/value_test.go @@ -21,6 +21,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/types/known/timestamppb" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" ) @@ -42,6 +43,14 @@ func TestMarshalAndUnmarshalTagValue(t *testing.T) { name: "binary data", src: &modelv1.TagValue{Value: &modelv1.TagValue_BinaryData{BinaryData: []byte("binaryData")}}, }, + { + name: "timestamp value", + src: &modelv1.TagValue{Value: &modelv1.TagValue_Timestamp{Timestamp: ×tamppb.Timestamp{Seconds: 1234567890, Nanos: 123456789}}}, + }, + { + name: "timestamp value with high precision", + src: &modelv1.TagValue{Value: &modelv1.TagValue_Timestamp{Timestamp: ×tamppb.Timestamp{Seconds: 0, Nanos: 999999999}}}, + }, { name: "unsupported type", src: &modelv1.TagValue{Value: &modelv1.TagValue_Null{}}, diff --git a/pkg/schema/init.go b/pkg/schema/init.go index 8cbe0a12..5407b402 100644 --- a/pkg/schema/init.go +++ b/pkg/schema/init.go @@ -33,14 +33,15 @@ type revisionContext struct { group int64 measure int64 stream int64 + trace int64 indexRule int64 indexRuleBinding int64 topNAgg int64 } func (r revisionContext) String() string { - return fmt.Sprintf("Group: %d, Measure: %d, Stream: %d, IndexRule: %d, IndexRuleBinding: %d, TopNAgg: %d", - r.group, r.measure, r.stream, r.indexRule, r.indexRuleBinding, r.topNAgg) + return fmt.Sprintf("Group: %d, Measure: %d, Stream: %d, Trace: %d, IndexRule: %d, IndexRuleBinding: %d, TopNAgg: %d", + r.group, r.measure, r.stream, r.trace, r.indexRule, r.indexRuleBinding, r.topNAgg) } type revisionContextKey struct{} @@ -48,7 +49,7 @@ type revisionContextKey struct{} var revCtxKey = revisionContextKey{} func (sr *schemaRepo) Init(kind schema.Kind) ([]string, []int64) { - if kind != schema.KindMeasure && kind != schema.KindStream { + if kind != schema.KindMeasure && kind != schema.KindStream && kind != schema.KindTrace { return nil, nil } catalog := sr.getCatalog(kind) @@ -74,6 +75,10 @@ func (sr *schemaRepo) Init(kind schema.Kind) ([]string, []int64) { sr.l.Info().Stringer("revision", revCtx).Msg("init measures") return groupNames, []int64{revCtx.group, revCtx.measure, revCtx.indexRuleBinding, revCtx.indexRule, revCtx.topNAgg} } + if kind == schema.KindTrace { + sr.l.Info().Stringer("revision", revCtx).Msg("init trace") + return groupNames, []int64{revCtx.group, revCtx.trace, revCtx.indexRuleBinding, revCtx.indexRule} + } sr.l.Info().Stringer("revision", revCtx).Msg("init stream") return groupNames, []int64{revCtx.group, revCtx.stream, revCtx.indexRuleBinding, revCtx.indexRule} } @@ -82,6 +87,9 @@ func (sr *schemaRepo) getCatalog(kind schema.Kind) commonv1.Catalog { if kind == schema.KindMeasure { return commonv1.Catalog_CATALOG_MEASURE } + if kind == schema.KindTrace { + return commonv1.Catalog_CATALOG_TRACE + } return commonv1.Catalog_CATALOG_STREAM } @@ -96,6 +104,10 @@ func (sr *schemaRepo) processGroup(ctx context.Context, g *commonv1.Group, catal sr.processMeasure(ctx, g.Metadata.Name) return } + if catalog == commonv1.Catalog_CATALOG_TRACE { + sr.processTrace(ctx, g.Metadata.Name) + return + } sr.processStream(ctx, g.Metadata.Name) } @@ -178,6 +190,27 @@ func (sr *schemaRepo) processStream(ctx context.Context, gName string) { sr.l.Info().Str("group", gName).Dur("duration", time.Since(start)).Int("size", len(ss)).Msg("store streams") } +func (sr *schemaRepo) processTrace(ctx context.Context, gName string) { + ctx, cancel := context.WithTimeout(ctx, initTimeout) + defer cancel() + start := time.Now() + tt, err := sr.metadata.TraceRegistry().ListTrace(ctx, schema.ListOpt{Group: gName}) + if err != nil { + logger.Panicf("fails to get the traces: %v", err) + return + } + revCtx := ctx.Value(revCtxKey).(*revisionContext) + for _, t := range tt { + if err := sr.storeResource(t); err != nil { + logger.Panicf("fails to store the trace: %v", err) + } + if t.Metadata.ModRevision > revCtx.trace { + revCtx.trace = t.Metadata.ModRevision + } + } + sr.l.Info().Str("group", gName).Dur("duration", time.Since(start)).Int("size", len(tt)).Msg("store traces") +} + func (sr *schemaRepo) initGroup(groupSchema *commonv1.Group) (*group, error) { g, ok := sr.getGroup(groupSchema.Metadata.Name) if ok { diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go index 55ac3940..b54d74a3 100644 --- a/pkg/test/setup/setup.go +++ b/pkg/test/setup/setup.go @@ -42,6 +42,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/test/helpers" test_measure "github.com/apache/skywalking-banyandb/pkg/test/measure" test_stream "github.com/apache/skywalking-banyandb/pkg/test/stream" + test_trace "github.com/apache/skywalking-banyandb/pkg/test/trace" ) const host = "localhost" @@ -51,6 +52,7 @@ func Standalone(flags ...string) (string, string, func()) { return StandaloneWithSchemaLoaders([]SchemaLoader{ &preloadService{name: "stream"}, &preloadService{name: "measure"}, + &preloadService{name: "trace"}, }, "", "", "", "", flags...) } @@ -59,6 +61,7 @@ func StandaloneWithAuth(username, password string, flags ...string) (string, str return StandaloneWithSchemaLoaders([]SchemaLoader{ &preloadService{name: "stream"}, &preloadService{name: "measure"}, + &preloadService{name: "trace"}, }, "", "", username, password, flags...) } @@ -67,6 +70,7 @@ func StandaloneWithTLS(certFile, keyFile string, flags ...string) (string, strin return StandaloneWithSchemaLoaders([]SchemaLoader{ &preloadService{name: "stream"}, &preloadService{name: "measure"}, + &preloadService{name: "trace"}, }, certFile, keyFile, "", "", flags...) } @@ -99,6 +103,7 @@ func ClosableStandalone(path string, ports []int, flags ...string) (string, stri return standaloneServer(path, ports, []SchemaLoader{ &preloadService{name: "stream"}, &preloadService{name: "measure"}, + &preloadService{name: "trace"}, }, "", "", flags...) } @@ -134,6 +139,7 @@ func standaloneServerWithAuth(path string, ports []int, schemaLoaders []SchemaLo "--measure-root-path=" + path, "--metadata-root-path=" + path, "--property-root-path=" + path, + "--trace-root-path=" + path, fmt.Sprintf("--etcd-listen-client-url=%s", endpoint), fmt.Sprintf("--etcd-listen-peer-url=http://%s:%d", host, ports[3]), } tlsEnabled := false @@ -202,6 +208,9 @@ func (p *preloadService) PreRun(ctx context.Context) error { if p.name == "stream" { return test_stream.PreloadSchema(ctx, p.registry) } + if p.name == "trace" { + return test_trace.PreloadSchema(ctx, p.registry) + } return test_measure.PreloadSchema(ctx, p.registry) } diff --git a/pkg/test/trace/etcd.go b/pkg/test/trace/etcd.go new file mode 100644 index 00000000..275c1765 --- /dev/null +++ b/pkg/test/trace/etcd.go @@ -0,0 +1,109 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package trace implements helpers to load schemas for testing. +package trace + +import ( + "context" + "embed" + "path" + + "github.com/pkg/errors" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + "github.com/apache/skywalking-banyandb/banyand/metadata/schema" +) + +const ( + groupDir = "testdata/groups" + traceDir = "testdata/traces" + indexRuleDir = "testdata/index_rules" + indexRuleBindingDir = "testdata/index_rule_bindings" +) + +//go:embed testdata/* +var store embed.FS + +// PreloadSchema loads schemas from files in the booting process. +func PreloadSchema(ctx context.Context, e schema.Registry) error { + return loadAllSchemas(ctx, e) +} + +// loadAllSchemas loads all trace-related schemas from the testdata directory. +func loadAllSchemas(ctx context.Context, e schema.Registry) error { + return preloadSchemaWithFuncs(ctx, e, + func(ctx context.Context, e schema.Registry) error { + return loadSchema(groupDir, &commonv1.Group{}, func(group *commonv1.Group) error { + return e.CreateGroup(ctx, group) + }) + }, + func(ctx context.Context, e schema.Registry) error { + return loadSchema(traceDir, &databasev1.Trace{}, func(trace *databasev1.Trace) error { + _, innerErr := e.CreateTrace(ctx, trace) + return innerErr + }) + }, + func(ctx context.Context, e schema.Registry) error { + return loadSchema(indexRuleDir, &databasev1.IndexRule{}, func(indexRule *databasev1.IndexRule) error { + return e.CreateIndexRule(ctx, indexRule) + }) + }, + func(ctx context.Context, e schema.Registry) error { + return loadSchema(indexRuleBindingDir, &databasev1.IndexRuleBinding{}, func(indexRuleBinding *databasev1.IndexRuleBinding) error { + return e.CreateIndexRuleBinding(ctx, indexRuleBinding) + }) + }, + ) +} + +// preloadSchemaWithFuncs extracts the common logic for loading schemas. +func preloadSchemaWithFuncs(ctx context.Context, e schema.Registry, loaders ...func(context.Context, schema.Registry) error) error { + for _, loader := range loaders { + if err := loader(ctx, e); err != nil { + return errors.WithStack(err) + } + } + return nil +} + +func loadSchema[T proto.Message](dir string, resource T, loadFn func(resource T) error) error { + entries, err := store.ReadDir(dir) + if err != nil { + return err + } + for _, entry := range entries { + data, err := store.ReadFile(path.Join(dir, entry.Name())) + if err != nil { + return err + } + resource.ProtoReflect().Descriptor().RequiredNumbers() + if err := protojson.Unmarshal(data, resource); err != nil { + return err + } + if err := loadFn(resource); err != nil { + if errors.Is(err, schema.ErrGRPCAlreadyExists) { + return nil + } + return err + } + } + return nil +} diff --git a/pkg/test/trace/testdata/groups/test-trace-group.json b/pkg/test/trace/testdata/groups/test-trace-group.json new file mode 100644 index 00000000..3d7bbaf2 --- /dev/null +++ b/pkg/test/trace/testdata/groups/test-trace-group.json @@ -0,0 +1,19 @@ +{ + "metadata": { + "name": "test-trace-group" + }, + "catalog": "CATALOG_TRACE", + "resource_opts": { + "shard_num": 2, + "replicas": 1, + "segment_interval": { + "unit": "UNIT_DAY", + "num": 1 + }, + "ttl": { + "unit": "UNIT_DAY", + "num": 3 + } + }, + "updated_at": "2021-04-15T01:30:15.01Z" +} \ No newline at end of file diff --git a/pkg/test/trace/testdata/index_rule_bindings/sw.json b/pkg/test/trace/testdata/index_rule_bindings/sw.json new file mode 100644 index 00000000..7955985b --- /dev/null +++ b/pkg/test/trace/testdata/index_rule_bindings/sw.json @@ -0,0 +1,17 @@ +{ + "metadata": { + "name": "sw-index-rule-binding", + "group": "test-trace-group" + }, + "rules": [ + "duration", + "timestamp" + ], + "subject": { + "catalog": "CATALOG_TRACE", + "name": "sw" + }, + "begin_at": "2021-04-15T01:30:15.01Z", + "expire_at": "2121-04-15T01:30:15.01Z", + "updated_at": "2021-04-15T01:30:15.01Z" +} \ No newline at end of file diff --git a/pkg/test/trace/testdata/index_rules/duration.json b/pkg/test/trace/testdata/index_rules/duration.json new file mode 100644 index 00000000..c9cb393a --- /dev/null +++ b/pkg/test/trace/testdata/index_rules/duration.json @@ -0,0 +1,14 @@ +{ + "metadata": { + "name": "duration", + "group": "test-trace-group" + }, + "tags": [ + "service_id", + "service_instance_id", + "state", + "duration" + ], + "type": "TYPE_TREE", + "updated_at": "2021-04-15T01:30:15.01Z" +} \ No newline at end of file diff --git a/pkg/test/trace/testdata/index_rules/timestamp.json b/pkg/test/trace/testdata/index_rules/timestamp.json new file mode 100644 index 00000000..be83e872 --- /dev/null +++ b/pkg/test/trace/testdata/index_rules/timestamp.json @@ -0,0 +1,14 @@ +{ + "metadata": { + "name": "timestamp", + "group": "test-trace-group" + }, + "tags": [ + "service_id", + "service_instance_id", + "state", + "timestamp" + ], + "type": "TYPE_TREE", + "updated_at": "2021-04-15T01:30:15.01Z" +} \ No newline at end of file diff --git a/pkg/test/trace/testdata/traces/sw.json b/pkg/test/trace/testdata/traces/sw.json new file mode 100644 index 00000000..37e21ed3 --- /dev/null +++ b/pkg/test/trace/testdata/traces/sw.json @@ -0,0 +1,39 @@ +{ + "metadata": { + "name": "sw", + "group": "test-trace-group" + }, + "tags": [ + { + "name": "trace_id", + "type": "TAG_TYPE_STRING" + }, + { + "name": "state", + "type": "TAG_TYPE_INT" + }, + { + "name": "service_id", + "type": "TAG_TYPE_STRING" + }, + { + "name": "service_instance_id", + "type": "TAG_TYPE_STRING" + }, + { + "name": "endpoint_id", + "type": "TAG_TYPE_STRING" + }, + { + "name": "duration", + "type": "TAG_TYPE_INT" + }, + { + "name": "timestamp", + "type": "TAG_TYPE_TIMESTAMP" + } + ], + "trace_id_tag_name": "trace_id", + "timestamp_tag_name": "timestamp", + "updated_at": "2021-04-15T01:30:15.01Z" +} \ No newline at end of file diff --git a/test/cases/init.go b/test/cases/init.go index 5e7d0543..9d152f99 100644 --- a/test/cases/init.go +++ b/test/cases/init.go @@ -60,4 +60,8 @@ func Initialize(addr string, now time.Time) { casesmeasuredata.Write(conn, "duplicated", "exception", "duplicated.json", now, 0) casesmeasuredata.Write(conn, "service_cpm_minute", "sw_updated", "service_cpm_minute_updated_data.json", now.Add(10*time.Minute), interval) time.Sleep(5 * time.Second) + // trace + // nolint:gocritic + // interval = 500 * time.Millisecond + // casestrace.Write(conn, "sw", now, interval) } diff --git a/test/cases/trace/data/data.go b/test/cases/trace/data/data.go new file mode 100644 index 00000000..fc70b23d --- /dev/null +++ b/test/cases/trace/data/data.go @@ -0,0 +1,206 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package data is used to test the trace service. +package data + +import ( + "context" + "embed" + "encoding/base64" + "encoding/json" + "fmt" + "io" + "slices" + "strings" + "time" + + "github.com/google/go-cmp/cmp" + g "github.com/onsi/ginkgo/v2" + gm "github.com/onsi/gomega" + grpclib "google.golang.org/grpc" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/testing/protocmp" + "google.golang.org/protobuf/types/known/timestamppb" + "sigs.k8s.io/yaml" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + tracev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1" + "github.com/apache/skywalking-banyandb/pkg/test/flags" + "github.com/apache/skywalking-banyandb/pkg/test/helpers" +) + +//go:embed input/*.yml +var inputFS embed.FS + +//go:embed want/*.yml +var wantFS embed.FS + +//go:embed testdata/*.json +var dataFS embed.FS + +// VerifyFn verify whether the query response matches the wanted result. +var VerifyFn = func(innerGm gm.Gomega, sharedContext helpers.SharedContext, args helpers.Args) { + i, err := inputFS.ReadFile("input/" + args.Input + ".yml") + innerGm.Expect(err).NotTo(gm.HaveOccurred()) + query := &tracev1.QueryRequest{} + helpers.UnmarshalYAML(i, query) + query.TimeRange = helpers.TimeRange(args, sharedContext) + query.Stages = args.Stages + c := tracev1.NewTraceServiceClient(sharedContext.Connection) + ctx := context.Background() + resp, err := c.Query(ctx, query) + if args.WantErr { + if err == nil { + g.Fail("expect error") + } + return + } + innerGm.Expect(err).NotTo(gm.HaveOccurred(), query.String()) + if args.WantEmpty { + innerGm.Expect(resp.Spans).To(gm.BeEmpty()) + return + } + if args.Want == "" { + args.Want = args.Input + } + ww, err := wantFS.ReadFile("want/" + args.Want + ".yml") + innerGm.Expect(err).NotTo(gm.HaveOccurred()) + want := &tracev1.QueryResponse{} + helpers.UnmarshalYAML(ww, want) + if args.DisOrder { + slices.SortFunc(want.Spans, func(a, b *tracev1.Span) int { + // Sort by first tag value for consistency + if len(a.Tags) > 0 && len(b.Tags) > 0 { + return strings.Compare(a.Tags[0].Value.GetStr().GetValue(), b.Tags[0].Value.GetStr().GetValue()) + } + return 0 + }) + slices.SortFunc(resp.Spans, func(a, b *tracev1.Span) int { + if len(a.Tags) > 0 && len(b.Tags) > 0 { + return strings.Compare(a.Tags[0].Value.GetStr().GetValue(), b.Tags[0].Value.GetStr().GetValue()) + } + return 0 + }) + } + var extra []cmp.Option + extra = append(extra, protocmp.IgnoreUnknown(), + protocmp.Transform()) + success := innerGm.Expect(cmp.Equal(resp, want, + extra...)). + To(gm.BeTrue(), func() string { + var j []byte + j, err = protojson.Marshal(resp) + if err != nil { + return err.Error() + } + var y []byte + y, err = yaml.JSONToYAML(j) + if err != nil { + return err.Error() + } + return string(y) + }) + if !success { + return + } + query.Trace = true + resp, err = c.Query(ctx, query) + innerGm.Expect(err).NotTo(gm.HaveOccurred()) + innerGm.Expect(resp.TraceQueryResult).NotTo(gm.BeNil()) + innerGm.Expect(resp.TraceQueryResult.GetSpans()).NotTo(gm.BeEmpty()) +} + +func loadData(stream tracev1.TraceService_WriteClient, metadata *commonv1.Metadata, dataFile string, baseTime time.Time, interval time.Duration) { + var templates []interface{} + content, err := dataFS.ReadFile("testdata/" + dataFile) + gm.Expect(err).ShouldNot(gm.HaveOccurred()) + gm.Expect(json.Unmarshal(content, &templates)).ShouldNot(gm.HaveOccurred()) + + for i, template := range templates { + // Extract span data from template + templateMap, ok := template.(map[string]interface{}) + gm.Expect(ok).To(gm.BeTrue()) + + // Get span data + spanData, ok := templateMap["span"].(string) + gm.Expect(ok).To(gm.BeTrue()) + spanBytes, err := base64.StdEncoding.DecodeString(spanData) + gm.Expect(err).ShouldNot(gm.HaveOccurred()) + + // Get tags data + tagsData, ok := templateMap["tags"].([]interface{}) + gm.Expect(ok).To(gm.BeTrue()) + + // Convert tags to TagValue format + var tagValues []*modelv1.TagValue + for _, tag := range tagsData { + tagBytes, err := json.Marshal(tag) + gm.Expect(err).ShouldNot(gm.HaveOccurred()) + tagValue := &modelv1.TagValue{} + gm.Expect(protojson.Unmarshal(tagBytes, tagValue)).ShouldNot(gm.HaveOccurred()) + tagValues = append(tagValues, tagValue) + } + + // Add timestamp tag as the last tag + timestamp := baseTime.Add(interval * time.Duration(i)) + timestampTag := &modelv1.TagValue{ + Value: &modelv1.TagValue_Timestamp{ + Timestamp: timestamppb.New(timestamp), + }, + } + tagValues = append(tagValues, timestampTag) + + errInner := stream.Send(&tracev1.WriteRequest{ + Metadata: metadata, + Tags: tagValues, + Span: spanBytes, + Version: uint64(i + 1), + }) + gm.Expect(errInner).ShouldNot(gm.HaveOccurred()) + } +} + +// Write writes trace data to the database. +func Write(conn *grpclib.ClientConn, name string, baseTime time.Time, interval time.Duration) { + WriteToGroup(conn, name, "test-trace-group", name, baseTime, interval) +} + +// WriteToGroup writes trace data to a specific group. +func WriteToGroup(conn *grpclib.ClientConn, name, group, fileName string, baseTime time.Time, interval time.Duration) { + metadata := &commonv1.Metadata{ + Name: name, + Group: group, + } + schema := databasev1.NewTraceRegistryServiceClient(conn) + resp, err := schema.Get(context.Background(), &databasev1.TraceRegistryServiceGetRequest{Metadata: metadata}) + gm.Expect(err).NotTo(gm.HaveOccurred()) + metadata = resp.GetTrace().GetMetadata() + + c := tracev1.NewTraceServiceClient(conn) + ctx := context.Background() + writeClient, err := c.Write(ctx) + gm.Expect(err).NotTo(gm.HaveOccurred()) + loadData(writeClient, metadata, fmt.Sprintf("%s.json", fileName), baseTime, interval) + gm.Expect(writeClient.CloseSend()).To(gm.Succeed()) + gm.Eventually(func() error { + _, err := writeClient.Recv() + return err + }, flags.EventuallyTimeout).Should(gm.Equal(io.EOF)) +} diff --git a/test/cases/trace/data/input/all.yml b/test/cases/trace/data/input/all.yml new file mode 100644 index 00000000..8a295c06 --- /dev/null +++ b/test/cases/trace/data/input/all.yml @@ -0,0 +1,20 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: "sw" +groups: ["test-trace-group"] +tag_projection: ["trace_id"] diff --git a/test/cases/trace/data/testdata/sw.json b/test/cases/trace/data/testdata/sw.json new file mode 100644 index 00000000..7213854a --- /dev/null +++ b/test/cases/trace/data/testdata/sw.json @@ -0,0 +1,177 @@ +[ + { + "tags": [ + { + "str": { + "value": "1" + } + }, + { + "int": { + "value": 1 + } + }, + { + "str": { + "value": "webapp_service" + } + }, + { + "str": { + "value": "webapp_instance_1" + } + }, + { + "str": { + "value": "/home_endpoint" + } + }, + { + "int": { + "value": 1000 + } + } + ], + "span": "YWJjMTIzIT8kKiYoKSctPUB+" + }, + { + "tags": [ + { + "str": { + "value": "2" + } + }, + { + "int": { + "value": 1 + } + }, + { + "str": { + "value": "webapp_service" + } + }, + { + "str": { + "value": "webapp_instance_2" + } + }, + { + "str": { + "value": "/product_endpoint" + } + }, + { + "int": { + "value": 500 + } + } + ], + "span": "YWJjMTIzIT8kKiYoKSctPUB+" + }, + { + "tags": [ + { + "str": { + "value": "3" + } + }, + { + "int": { + "value": 0 + } + }, + { + "str": { + "value": "webapp_service" + } + }, + { + "str": { + "value": "webapp_instance_1" + } + }, + { + "str": { + "value": "/home_endpoint" + } + }, + { + "int": { + "value": 30 + } + } + ], + "span": "YWJjMTIzIT8kKiYoKSctPUB+" + }, + { + "tags": [ + { + "str": { + "value": "4" + } + }, + { + "int": { + "value": 0 + } + }, + { + "str": { + "value": "webapp_service" + } + }, + { + "str": { + "value": "webapp_instance_3" + } + }, + { + "str": { + "value": "/price_endpoint" + } + }, + { + "int": { + "value": 60 + } + } + ], + "span": "YWJjMTIzIT8kKiYoKSctPUB+" + }, + { + "tags": [ + { + "str": { + "value": "5" + } + }, + { + "int": { + "value": 0 + } + }, + { + "str": { + "value": "webapp_service" + } + }, + { + "str": { + "value": "webapp_instance_1" + } + }, + { + "str": { + "value": "/item_endpoint" + } + }, + { + "int": { + "value": 300 + } + } + ], + "span": "YWJjMTIzIT8kKiYoKSctPUB+" + } +] \ No newline at end of file diff --git a/test/cases/trace/data/want/all.yml b/test/cases/trace/data/want/all.yml new file mode 100644 index 00000000..e29fbaf0 --- /dev/null +++ b/test/cases/trace/data/want/all.yml @@ -0,0 +1,48 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +spans: + - tags: + - key: trace_id + value: + str: + value: "1" + span: "YWJjMTIzIT8kKiYoKSctPUB+" + - tags: + - key: trace_id + value: + str: + value: "2" + span: "YWJjMTIzIT8kKiYoKSctPUB+" + - tags: + - key: trace_id + value: + str: + value: "3" + span: "YWJjMTIzIT8kKiYoKSctPUB+" + - tags: + - key: trace_id + value: + str: + value: "4" + span: "YWJjMTIzIT8kKiYoKSctPUB+" + - tags: + - key: trace_id + value: + str: + value: "5" + span: "YWJjMTIzIT8kKiYoKSctPUB+" diff --git a/test/cases/trace/trace.go b/test/cases/trace/trace.go new file mode 100644 index 00000000..dd7eb4db --- /dev/null +++ b/test/cases/trace/trace.go @@ -0,0 +1,46 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package trace_test contains integration test cases of the trace. +package trace_test + +import ( + "time" + + g "github.com/onsi/ginkgo/v2" + gm "github.com/onsi/gomega" + + "github.com/apache/skywalking-banyandb/pkg/test/flags" + "github.com/apache/skywalking-banyandb/pkg/test/helpers" + trace_test_data "github.com/apache/skywalking-banyandb/test/cases/trace/data" +) + +var ( + // SharedContext is the parallel execution context. + SharedContext helpers.SharedContext + verify = func(innerGm gm.Gomega, args helpers.Args) { + trace_test_data.VerifyFn(innerGm, SharedContext, args) + } +) + +var _ = g.FDescribeTable("Scanning Traces", func(args helpers.Args) { + gm.Eventually(func(innerGm gm.Gomega) { + verify(innerGm, args) + }, flags.EventuallyTimeout).Should(gm.Succeed()) +}, + g.Entry("all elements", helpers.Args{Input: "all", Duration: 1 * time.Hour}), +) diff --git a/test/integration/standalone/query/query_suite_test.go b/test/integration/standalone/query/query_suite_test.go index 7c576550..f760710a 100644 --- a/test/integration/standalone/query/query_suite_test.go +++ b/test/integration/standalone/query/query_suite_test.go @@ -82,6 +82,10 @@ var _ = SynchronizedBeforeSuite(func() []byte { Connection: connection, BaseTime: now, } + // casestrace.SharedContext = helpers.SharedContext{ + // Connection: connection, + // BaseTime: now, + // } Expect(err).NotTo(HaveOccurred()) })