This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch sidx/query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 5bc319fec6b9a3a46a263e8807d984db9f277b06 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Sun Aug 24 10:08:05 2025 +0800 Refactor block metadata handling: Update marshal and unmarshal methods for blockMetadata to improve serialization and deserialization efficiency. Modify tests to accommodate changes in metadata handling and ensure consistency in data processing across the codebase. --- banyand/internal/sidx/block_writer.go | 6 +- banyand/internal/sidx/metadata.go | 260 +++++++++++++-------------------- banyand/internal/sidx/metadata_test.go | 8 +- banyand/internal/sidx/part.go | 139 +++++++++--------- banyand/internal/sidx/part_iter.go | 4 +- banyand/internal/sidx/part_test.go | 30 ++-- 6 files changed, 191 insertions(+), 256 deletions(-) diff --git a/banyand/internal/sidx/block_writer.go b/banyand/internal/sidx/block_writer.go index 8067e009..99cc0019 100644 --- a/banyand/internal/sidx/block_writer.go +++ b/banyand/internal/sidx/block_writer.go @@ -308,11 +308,7 @@ func (bw *blockWriter) mustWriteBlock(sid common.SeriesID, b *block) { // Serialize block metadata bm.setSeriesID(sid) bm.setKeyRange(minKey, maxKey) - bmData, err := bm.marshal() - if err != nil { - logger.Panicf("failed to marshal block metadata: %v", err) - } - bw.primaryBlockData = append(bw.primaryBlockData, bmData...) + bw.primaryBlockData = bm.marshal(bw.primaryBlockData) if len(bw.primaryBlockData) > maxUncompressedPrimaryBlockSize { bw.mustFlushPrimaryBlock(bw.primaryBlockData) diff --git a/banyand/internal/sidx/metadata.go b/banyand/internal/sidx/metadata.go index 85b7dcab..c6ffd4d4 100644 --- a/banyand/internal/sidx/metadata.go +++ b/banyand/internal/sidx/metadata.go @@ -18,11 +18,9 @@ package sidx import ( - "bytes" - "encoding/binary" "encoding/json" "fmt" - "io" + "sort" "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/pkg/encoding" @@ -268,188 +266,126 @@ func unmarshalPartMetadata(data []byte) (*partMetadata, error) { } // marshal serializes blockMetadata to bytes. -func (bm *blockMetadata) marshal() ([]byte, error) { - buf := &bytes.Buffer{} - - // Write seriesID - if err := binary.Write(buf, binary.LittleEndian, bm.seriesID); err != nil { - return nil, fmt.Errorf("failed to write seriesID: %w", err) - } - - // Write key range - if err := binary.Write(buf, binary.LittleEndian, bm.minKey); err != nil { - return nil, fmt.Errorf("failed to write minKey: %w", err) - } - if err := binary.Write(buf, binary.LittleEndian, bm.maxKey); err != nil { - return nil, fmt.Errorf("failed to write maxKey: %w", err) - } +func (bm *blockMetadata) marshal(dst []byte) []byte { + dst = bm.seriesID.AppendToBytes(dst) + dst = encoding.VarUint64ToBytes(dst, uint64(bm.minKey)) + dst = encoding.VarUint64ToBytes(dst, uint64(bm.maxKey)) + dst = encoding.VarUint64ToBytes(dst, bm.dataBlock.offset) + dst = encoding.VarUint64ToBytes(dst, bm.dataBlock.size) + dst = encoding.VarUint64ToBytes(dst, bm.keysBlock.offset) + dst = encoding.VarUint64ToBytes(dst, bm.keysBlock.size) + dst = append(dst, byte(bm.keysEncodeType)) + dst = encoding.VarUint64ToBytes(dst, bm.count) + dst = encoding.VarUint64ToBytes(dst, bm.uncompressedSize) + dst = encoding.VarUint64ToBytes(dst, uint64(len(bm.tagsBlocks))) + + // Write tag blocks in sorted order for consistency + if len(bm.tagsBlocks) > 0 { + tagNames := make([]string, 0, len(bm.tagsBlocks)) + for tagName := range bm.tagsBlocks { + tagNames = append(tagNames, tagName) + } + sort.Strings(tagNames) - // Write data block - if err := binary.Write(buf, binary.LittleEndian, bm.dataBlock.offset); err != nil { - return nil, fmt.Errorf("failed to write data block offset: %w", err) - } - if err := binary.Write(buf, binary.LittleEndian, bm.dataBlock.size); err != nil { - return nil, fmt.Errorf("failed to write data block size: %w", err) + for _, tagName := range tagNames { + block := bm.tagsBlocks[tagName] + dst = encoding.EncodeBytes(dst, []byte(tagName)) + dst = encoding.VarUint64ToBytes(dst, block.offset) + dst = encoding.VarUint64ToBytes(dst, block.size) + } } - // Write keys block - if err := binary.Write(buf, binary.LittleEndian, bm.keysBlock.offset); err != nil { - return nil, fmt.Errorf("failed to write keys block offset: %w", err) - } - if err := binary.Write(buf, binary.LittleEndian, bm.keysBlock.size); err != nil { - return nil, fmt.Errorf("failed to write keys block size: %w", err) - } + return dst +} - // Write keys encoding information - if err := binary.Write(buf, binary.LittleEndian, byte(bm.keysEncodeType)); err != nil { - return nil, fmt.Errorf("failed to write keys encode type: %w", err) - } - if err := binary.Write(buf, binary.LittleEndian, bm.minKey); err != nil { - return nil, fmt.Errorf("failed to write keys first value: %w", err) +// unmarshal deserializes blockMetadata from bytes. +func (bm *blockMetadata) unmarshal(src []byte) ([]byte, error) { + if len(src) < 8 { + return nil, fmt.Errorf("cannot unmarshal blockMetadata from less than 8 bytes") } + bm.seriesID = common.SeriesID(encoding.BytesToUint64(src)) + src = src[8:] - // Write count and uncompressed size - if err := binary.Write(buf, binary.LittleEndian, bm.count); err != nil { - return nil, fmt.Errorf("failed to write count: %w", err) - } - if err := binary.Write(buf, binary.LittleEndian, bm.uncompressedSize); err != nil { - return nil, fmt.Errorf("failed to write uncompressed size: %w", err) - } + var n uint64 + src, n = encoding.BytesToVarUint64(src) + bm.minKey = int64(n) - // Write tag blocks count - if err := binary.Write(buf, binary.LittleEndian, uint32(len(bm.tagsBlocks))); err != nil { - return nil, fmt.Errorf("failed to write tag blocks count: %w", err) - } + src, n = encoding.BytesToVarUint64(src) + bm.maxKey = int64(n) - // Write tag blocks - for tagName, tagBlock := range bm.tagsBlocks { - // Write tag name length and name - nameBytes := []byte(tagName) - if err := binary.Write(buf, binary.LittleEndian, uint32(len(nameBytes))); err != nil { - return nil, fmt.Errorf("failed to write tag name length: %w", err) - } - if _, err := buf.Write(nameBytes); err != nil { - return nil, fmt.Errorf("failed to write tag name: %w", err) - } + src, n = encoding.BytesToVarUint64(src) + bm.dataBlock.offset = n - // Write tag block - if err := binary.Write(buf, binary.LittleEndian, tagBlock.offset); err != nil { - return nil, fmt.Errorf("failed to write tag block offset: %w", err) - } - if err := binary.Write(buf, binary.LittleEndian, tagBlock.size); err != nil { - return nil, fmt.Errorf("failed to write tag block size: %w", err) - } - } + src, n = encoding.BytesToVarUint64(src) + bm.dataBlock.size = n - return buf.Bytes(), nil -} + src, n = encoding.BytesToVarUint64(src) + bm.keysBlock.offset = n -// unmarshalBlockMetadata deserializes blockMetadata from bytes. -func unmarshalBlockMetadata(data []byte) (*blockMetadata, error) { - bm := generateBlockMetadata() - buf := bytes.NewReader(data) + src, n = encoding.BytesToVarUint64(src) + bm.keysBlock.size = n - // Read seriesID - if err := binary.Read(buf, binary.LittleEndian, &bm.seriesID); err != nil { - releaseBlockMetadata(bm) - return nil, fmt.Errorf("failed to read seriesID: %w", err) + if len(src) < 1 { + return nil, fmt.Errorf("not enough bytes to read keysEncodeType") } + bm.keysEncodeType = encoding.EncodeType(src[0]) + src = src[1:] - // Read key range - if err := binary.Read(buf, binary.LittleEndian, &bm.minKey); err != nil { - releaseBlockMetadata(bm) - return nil, fmt.Errorf("failed to read minKey: %w", err) - } - if err := binary.Read(buf, binary.LittleEndian, &bm.maxKey); err != nil { - releaseBlockMetadata(bm) - return nil, fmt.Errorf("failed to read maxKey: %w", err) - } - - // Read data block - if err := binary.Read(buf, binary.LittleEndian, &bm.dataBlock.offset); err != nil { - releaseBlockMetadata(bm) - return nil, fmt.Errorf("failed to read data block offset: %w", err) - } - if err := binary.Read(buf, binary.LittleEndian, &bm.dataBlock.size); err != nil { - releaseBlockMetadata(bm) - return nil, fmt.Errorf("failed to read data block size: %w", err) - } + src, n = encoding.BytesToVarUint64(src) + bm.count = n - // Read keys block - if err := binary.Read(buf, binary.LittleEndian, &bm.keysBlock.offset); err != nil { - releaseBlockMetadata(bm) - return nil, fmt.Errorf("failed to read keys block offset: %w", err) - } - if err := binary.Read(buf, binary.LittleEndian, &bm.keysBlock.size); err != nil { - releaseBlockMetadata(bm) - return nil, fmt.Errorf("failed to read keys block size: %w", err) - } + src, n = encoding.BytesToVarUint64(src) + bm.uncompressedSize = n - // Read keys encoding information - var encodeTypeByte byte - if err := binary.Read(buf, binary.LittleEndian, &encodeTypeByte); err != nil { - releaseBlockMetadata(bm) - return nil, fmt.Errorf("failed to read keys encode type: %w", err) - } - bm.keysEncodeType = encoding.EncodeType(encodeTypeByte) - if err := binary.Read(buf, binary.LittleEndian, &bm.minKey); err != nil { - releaseBlockMetadata(bm) - return nil, fmt.Errorf("failed to read keys first value: %w", err) - } - - // Read count and uncompressed size - if err := binary.Read(buf, binary.LittleEndian, &bm.count); err != nil { - releaseBlockMetadata(bm) - return nil, fmt.Errorf("failed to read count: %w", err) - } - if err := binary.Read(buf, binary.LittleEndian, &bm.uncompressedSize); err != nil { - releaseBlockMetadata(bm) - return nil, fmt.Errorf("failed to read uncompressed size: %w", err) - } + src, n = encoding.BytesToVarUint64(src) + tagBlocksCount := n - // Read tag blocks count - var tagBlocksCount uint32 - if err := binary.Read(buf, binary.LittleEndian, &tagBlocksCount); err != nil { - releaseBlockMetadata(bm) - return nil, fmt.Errorf("failed to read tag blocks count: %w", err) - } - - // Read tag blocks - for i := uint32(0); i < tagBlocksCount; i++ { - // Read tag name - var nameLen uint32 - if err := binary.Read(buf, binary.LittleEndian, &nameLen); err != nil { - releaseBlockMetadata(bm) - return nil, fmt.Errorf("failed to read tag name length: %w", err) + if tagBlocksCount > 0 { + if bm.tagsBlocks == nil { + bm.tagsBlocks = make(map[string]dataBlock, tagBlocksCount) } - nameBytes := make([]byte, nameLen) - if _, err := io.ReadFull(buf, nameBytes); err != nil { - releaseBlockMetadata(bm) - return nil, fmt.Errorf("failed to read tag name: %w", err) + var nameBytes []byte + var err error + for i := uint64(0); i < tagBlocksCount; i++ { + src, nameBytes, err = encoding.DecodeBytes(src) + if err != nil { + return nil, fmt.Errorf("cannot unmarshal tag name: %w", err) + } + tagName := string(nameBytes) + + src, n = encoding.BytesToVarUint64(src) + offset := n + + src, n = encoding.BytesToVarUint64(src) + size := n + + bm.tagsBlocks[tagName] = dataBlock{ + offset: offset, + size: size, + } } - tagName := string(nameBytes) + } + + return src, nil +} - // Read tag block - var tagBlock dataBlock - if err := binary.Read(buf, binary.LittleEndian, &tagBlock.offset); err != nil { - releaseBlockMetadata(bm) - return nil, fmt.Errorf("failed to read tag block offset: %w", err) +// unmarshalBlockMetadata deserializes multiple blockMetadata from bytes. +func unmarshalBlockMetadata(dst []blockMetadata, src []byte) ([]blockMetadata, error) { + dstOrig := dst + for len(src) > 0 { + if len(dst) < cap(dst) { + dst = dst[:len(dst)+1] + } else { + dst = append(dst, blockMetadata{}) } - if err := binary.Read(buf, binary.LittleEndian, &tagBlock.size); err != nil { - releaseBlockMetadata(bm) - return nil, fmt.Errorf("failed to read tag block size: %w", err) + bm := &dst[len(dst)-1] + tail, err := bm.unmarshal(src) + if err != nil { + return dstOrig, fmt.Errorf("cannot unmarshal blockMetadata entries: %w", err) } - - bm.tagsBlocks[tagName] = tagBlock + src = tail } - - // Validate the metadata - if err := bm.validate(); err != nil { - releaseBlockMetadata(bm) - return nil, fmt.Errorf("block metadata validation failed: %w", err) - } - - return bm, nil + return dst, nil } // SeriesID returns the seriesID of the block. diff --git a/banyand/internal/sidx/metadata_test.go b/banyand/internal/sidx/metadata_test.go index 73346b96..81ec01fe 100644 --- a/banyand/internal/sidx/metadata_test.go +++ b/banyand/internal/sidx/metadata_test.go @@ -425,14 +425,14 @@ func TestBlockMetadata_Serialization(t *testing.T) { } // Test marshaling - data, err := original.marshal() - require.NoError(t, err) + data := original.marshal(nil) assert.NotEmpty(t, data) // Test unmarshaling - restored, err := unmarshalBlockMetadata(data) + restoredArray, err := unmarshalBlockMetadata(nil, data) require.NoError(t, err) - defer releaseBlockMetadata(restored) + require.Len(t, restoredArray, 1) + restored := &restoredArray[0] // Verify all fields match assert.Equal(t, original.seriesID, restored.seriesID) diff --git a/banyand/internal/sidx/part.go b/banyand/internal/sidx/part.go index f51cb719..7aa6805d 100644 --- a/banyand/internal/sidx/part.go +++ b/banyand/internal/sidx/part.go @@ -93,10 +93,7 @@ func mustOpenPart(path string, fileSystem fs.FileSystem) *part { } // Load primary block metadata from primary.bin. - if err := p.loadPrimaryBlockMetadata(); err != nil { - p.close() - logger.GetLogger().Panic().Err(err).Str("path", path).Msg("failed to load primary block metadata") - } + p.loadPrimaryBlockMetadata() // Discover and open tag files. p.openTagFiles() @@ -136,11 +133,10 @@ func (p *part) loadPartMetadata() error { return nil } -// loadBlockMetadata reads and parses primary block metadata from meta.bin. -func (p *part) loadPrimaryBlockMetadata() error { +// loadPrimaryBlockMetadata reads and parses primary block metadata from meta.bin. +func (p *part) loadPrimaryBlockMetadata() { // Load primary block metadata from meta.bin file (compressed primaryBlockMetadata) p.primaryBlockMetadata = mustReadPrimaryBlockMetadata(p.primaryBlockMetadata[:0], p.meta) - return nil } // openTagFiles discovers and opens all tag files in the part directory. @@ -278,8 +274,6 @@ func (p *part) readAll() ([]*elements, error) { compressedKeysBuf := make([]byte, 0, 1024) keysBuf := make([]byte, 0, 1024) - bytesDecoder := &encoding.BytesBlockDecoder{} - for _, pbm := range p.primaryBlockMetadata { // Read and decompress primary block metadata compressedPrimaryBuf = bytes.ResizeOver(compressedPrimaryBuf, int(pbm.size)) @@ -295,8 +289,8 @@ func (p *part) readAll() ([]*elements, error) { return nil, fmt.Errorf("cannot decompress primary block: %w", err) } - // Unmarshal block metadata - bm, err := unmarshalBlockMetadata(primaryBuf) + // Unmarshal all block metadata entries in this primary block + blockMetadataArray, err := unmarshalBlockMetadata(nil, primaryBuf) if err != nil { // Clean up any elements created so far for _, e := range result { @@ -305,78 +299,83 @@ func (p *part) readAll() ([]*elements, error) { return nil, fmt.Errorf("cannot unmarshal block metadata: %w", err) } - // Create elements for this block - elems := generateElements() + // Process each block metadata + for i := range blockMetadataArray { + bm := &blockMetadataArray[i] - // Read user keys - compressedKeysBuf = bytes.ResizeOver(compressedKeysBuf, int(bm.keysBlock.size)) - fs.MustReadData(p.keys, int64(bm.keysBlock.offset), compressedKeysBuf) - - keysBuf, err = zstd.Decompress(keysBuf[:0], compressedKeysBuf) - if err != nil { - releaseElements(elems) - for _, e := range result { - releaseElements(e) - } - return nil, fmt.Errorf("cannot decompress keys block: %w", err) - } - - // Decode user keys using the stored encoding information - elems.userKeys, err = encoding.BytesToInt64List(elems.userKeys[:0], keysBuf, bm.keysEncodeType, bm.minKey, int(bm.count)) - if err != nil { - releaseElements(elems) - for _, e := range result { - releaseElements(e) - } - return nil, fmt.Errorf("cannot decode user keys: %w", err) - } + // Create elements for this block + elems := generateElements() - // Read data payloads - compressedDataBuf = bytes.ResizeOver(compressedDataBuf, int(bm.dataBlock.size)) - fs.MustReadData(p.data, int64(bm.dataBlock.offset), compressedDataBuf) + // Read user keys + compressedKeysBuf = bytes.ResizeOver(compressedKeysBuf, int(bm.keysBlock.size)) + fs.MustReadData(p.keys, int64(bm.keysBlock.offset), compressedKeysBuf) - dataBuf, err = zstd.Decompress(dataBuf[:0], compressedDataBuf) - if err != nil { - releaseElements(elems) - for _, e := range result { - releaseElements(e) + keysBuf, err = zstd.Decompress(keysBuf[:0], compressedKeysBuf) + if err != nil { + releaseElements(elems) + for _, e := range result { + releaseElements(e) + } + return nil, fmt.Errorf("cannot decompress keys block: %w", err) } - return nil, fmt.Errorf("cannot decompress data block: %w", err) - } - // Decode data payloads - bytesDecoder.Reset() - elems.data, err = bytesDecoder.Decode(elems.data[:0], dataBuf, bm.count) - if err != nil { - releaseElements(elems) - for _, e := range result { - releaseElements(e) + // Decode user keys using the stored encoding information + elems.userKeys, err = encoding.BytesToInt64List(elems.userKeys[:0], keysBuf, bm.keysEncodeType, bm.minKey, int(bm.count)) + if err != nil { + releaseElements(elems) + for _, e := range result { + releaseElements(e) + } + return nil, fmt.Errorf("cannot decode user keys: %w", err) } - return nil, fmt.Errorf("cannot decode data payloads: %w", err) - } - // Initialize seriesIDs and tags slices - elems.seriesIDs = make([]common.SeriesID, int(bm.count)) - elems.tags = make([][]tag, int(bm.count)) + // Read data payloads + compressedDataBuf = bytes.ResizeOver(compressedDataBuf, int(bm.dataBlock.size)) + fs.MustReadData(p.data, int64(bm.dataBlock.offset), compressedDataBuf) - // Fill seriesIDs - all elements in this block have the same seriesID - for i := range elems.seriesIDs { - elems.seriesIDs[i] = bm.seriesID - } + dataBuf, err = zstd.Decompress(dataBuf[:0], compressedDataBuf) + if err != nil { + releaseElements(elems) + for _, e := range result { + releaseElements(e) + } + return nil, fmt.Errorf("cannot decompress data block: %w", err) + } - // Read tags for each tag name - for tagName := range bm.tagsBlocks { - err = p.readBlockTags(tagName, bm, elems) + // Decode data payloads - create a new decoder for each block to avoid state corruption + blockBytesDecoder := &encoding.BytesBlockDecoder{} + elems.data, err = blockBytesDecoder.Decode(elems.data[:0], dataBuf, bm.count) if err != nil { releaseElements(elems) for _, e := range result { releaseElements(e) } - return nil, fmt.Errorf("cannot read tags for %s: %w", tagName, err) + return nil, fmt.Errorf("cannot decode data payloads: %w", err) + } + + // Initialize seriesIDs and tags slices + elems.seriesIDs = make([]common.SeriesID, int(bm.count)) + elems.tags = make([][]tag, int(bm.count)) + + // Fill seriesIDs - all elements in this block have the same seriesID + for j := range elems.seriesIDs { + elems.seriesIDs[j] = bm.seriesID } - } - result = append(result, elems) + // Read tags for each tag name + for tagName := range bm.tagsBlocks { + err = p.readBlockTags(tagName, bm, elems) + if err != nil { + releaseElements(elems) + for _, e := range result { + releaseElements(e) + } + return nil, fmt.Errorf("cannot read tags for %s: %w", tagName, err) + } + } + + result = append(result, elems) + } } return result, nil @@ -426,11 +425,15 @@ func (p *part) readBlockTags(tagName string, bm *blockMetadata, elems *elements) return fmt.Errorf("cannot decode tag values: %w", err) } - // Add tag values to elements + // Add tag values to elements (only for non-nil values) for i, value := range tagValues { if i >= len(elems.tags) { break } + // Skip nil values - they represent missing tags for this element + if value == nil { + continue + } if elems.tags[i] == nil { elems.tags[i] = make([]tag, 0, 1) } diff --git a/banyand/internal/sidx/part_iter.go b/banyand/internal/sidx/part_iter.go index b8ccee98..a090bae5 100644 --- a/banyand/internal/sidx/part_iter.go +++ b/banyand/internal/sidx/part_iter.go @@ -187,11 +187,11 @@ func (pi *partIter) readPrimaryBlock(bms []blockMetadata, pbm *primaryBlockMetad if err != nil { return nil, fmt.Errorf("cannot decompress index block: %w", err) } - bm, err := unmarshalBlockMetadata(pi.primaryBuf) + blockMetadataArray, err := unmarshalBlockMetadata(nil, pi.primaryBuf) if err != nil { return nil, fmt.Errorf("cannot unmarshal index block: %w", err) } - bms = append(bms, *bm) + bms = append(bms, blockMetadataArray...) return bms, nil } diff --git a/banyand/internal/sidx/part_test.go b/banyand/internal/sidx/part_test.go index 71f62ef6..a764cb00 100644 --- a/banyand/internal/sidx/part_test.go +++ b/banyand/internal/sidx/part_test.go @@ -121,15 +121,16 @@ func TestPartStringRepresentation(t *testing.T) { }, } - // Marshal and compress primary block metadata + // Marshal and compress primary block metadata for meta.bin primaryData := pbm.marshal(nil) compressedPrimaryData := zstd.Compress(nil, primaryData, 1) testFiles := map[string][]byte{ - primaryFilename: compressedPrimaryData, - dataFilename: []byte("data"), - keysFilename: []byte("keys"), - metaFilename: metaData, + primaryFilename: []byte("primary"), // placeholder for primary.bin + dataFilename: []byte("data"), + keysFilename: []byte("keys"), + metaFilename: compressedPrimaryData, // meta.bin should contain compressed primary block metadata + manifestFilename: metaData, // manifest.json should contain part metadata } for fileName, content := range testFiles { @@ -216,8 +217,8 @@ func TestMemPartFlushAndReadAllRoundTrip(t *testing.T) { tempDir := t.TempDir() tests := []struct { - name string elements *elements + name string }{ { name: "single series with single element", @@ -390,13 +391,13 @@ func TestMemPartFlushAndReadAllRoundTrip(t *testing.T) { // Create a copy of elements for initialization to avoid modifying the original elementsCopy := generateElements() defer releaseElements(elementsCopy) - + // Deep copy the elements elementsCopy.seriesIDs = append(elementsCopy.seriesIDs, tt.elements.seriesIDs...) elementsCopy.userKeys = append(elementsCopy.userKeys, tt.elements.userKeys...) elementsCopy.data = append(elementsCopy.data, tt.elements.data...) elementsCopy.tags = append(elementsCopy.tags, tt.elements.tags...) - + // Initialize memPart from elements copy mp.mustInitFromElements(elementsCopy) @@ -408,7 +409,6 @@ func TestMemPartFlushAndReadAllRoundTrip(t *testing.T) { part := mustOpenPart(partDir, testFS) defer part.close() - // Step 4: Read all elements back from part resultElements, err := part.readAll() require.NoError(t, err, "readAll should not return error") @@ -430,7 +430,7 @@ func TestMemPartFlushAndReadAllRoundTrip(t *testing.T) { // Create a clean copy of original elements for comparison (avoid sorting corruption) originalCopy := generateElements() defer releaseElements(originalCopy) - + originalCopy.seriesIDs = append(originalCopy.seriesIDs, tt.elements.seriesIDs...) originalCopy.userKeys = append(originalCopy.userKeys, tt.elements.userKeys...) originalCopy.data = append(originalCopy.data, tt.elements.data...) @@ -451,15 +451,15 @@ func TestMemPartFlushAndReadAllRoundTrip(t *testing.T) { } } -// testElement represents a single element for test creation +// testElement represents a single element for test creation. type testElement struct { - seriesID common.SeriesID - userKey int64 data []byte tags []tag + seriesID common.SeriesID + userKey int64 } -// createTestElements creates an elements collection from test data +// createTestElements creates an elements collection from test data. func createTestElements(testElems []testElement) *elements { elems := generateElements() @@ -488,7 +488,7 @@ func createTestElements(testElems []testElement) *elements { return elems } -// compareElements compares two elements collections for equality +// compareElements compares two elements collections for equality. func compareElements(t *testing.T, expected, actual *elements) { require.Equal(t, len(expected.seriesIDs), len(actual.seriesIDs), "seriesIDs length mismatch") require.Equal(t, len(expected.userKeys), len(actual.userKeys), "userKeys length mismatch")