This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch sidx/query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit b9b800a2e6c93d8520b4ece16d553e5fa87a1f98 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Wed Aug 27 07:59:06 2025 +0800 Add copyFrom method for blockMetadata: Implement deep copy functionality to ensure accurate duplication of metadata fields, including handling of tags and projections. Introduce comprehensive tests to validate copy behavior across various scenarios, including edge cases and existing data overwrites. --- banyand/internal/sidx/metadata.go | 27 + banyand/internal/sidx/metadata_test.go | 341 ++++++++++++ banyand/internal/sidx/part_iter.go | 22 - banyand/internal/sidx/query_result.go | 16 +- banyand/internal/sidx/query_result_test.go | 806 ++++++++++++++++++++++++++++- 5 files changed, 1181 insertions(+), 31 deletions(-) diff --git a/banyand/internal/sidx/metadata.go b/banyand/internal/sidx/metadata.go index e8b961cb..dbdf6715 100644 --- a/banyand/internal/sidx/metadata.go +++ b/banyand/internal/sidx/metadata.go @@ -20,6 +20,7 @@ package sidx import ( "encoding/json" "fmt" + "maps" "sort" "github.com/apache/skywalking-banyandb/api/common" @@ -159,6 +160,32 @@ func (bm *blockMetadata) reset() { bm.tagProjection = bm.tagProjection[:0] } +func (bm *blockMetadata) copyFrom(other *blockMetadata) { + if other == nil { + return + } + + bm.seriesID = other.seriesID + bm.minKey = other.minKey + bm.maxKey = other.maxKey + bm.count = other.count + bm.uncompressedSize = other.uncompressedSize + bm.keysEncodeType = other.keysEncodeType + bm.dataBlock = other.dataBlock + bm.keysBlock = other.keysBlock + + // Copy tag blocks + if bm.tagsBlocks == nil { + bm.tagsBlocks = make(map[string]dataBlock) + } + clear(bm.tagsBlocks) + maps.Copy(bm.tagsBlocks, other.tagsBlocks) + + // Copy tag projection + bm.tagProjection = bm.tagProjection[:0] + bm.tagProjection = append(bm.tagProjection, other.tagProjection...) +} + // validate validates the partMetadata for consistency. func (pm *partMetadata) validate() error { if pm.MinKey > pm.MaxKey { diff --git a/banyand/internal/sidx/metadata_test.go b/banyand/internal/sidx/metadata_test.go index 81ec01fe..7cd65e34 100644 --- a/banyand/internal/sidx/metadata_test.go +++ b/banyand/internal/sidx/metadata_test.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/require" "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/pkg/encoding" ) func TestPartMetadata_Validation(t *testing.T) { @@ -639,3 +640,343 @@ func TestMetadata_Reset(t *testing.T) { assert.Equal(t, dataBlock{}, bm.keysBlock) assert.Equal(t, 0, len(bm.tagsBlocks)) } + +func TestBlockMetadata_CopyFrom(t *testing.T) { + tests := []struct { + name string + source *blockMetadata + target *blockMetadata + expectedResult *blockMetadata + description string + }{ + { + name: "copy complete metadata", + source: &blockMetadata{ + seriesID: common.SeriesID(123), + minKey: 10, + maxKey: 100, + uncompressedSize: 2048, + count: 50, + keysEncodeType: 1, + dataBlock: dataBlock{offset: 1000, size: 2048}, + keysBlock: dataBlock{offset: 3048, size: 512}, + tagsBlocks: map[string]dataBlock{ + "service_id": {offset: 3560, size: 256}, + "endpoint": {offset: 3816, size: 512}, + "status": {offset: 4328, size: 128}, + }, + tagProjection: []string{"service_id", "endpoint", "status"}, + }, + target: &blockMetadata{}, + expectedResult: &blockMetadata{ + seriesID: common.SeriesID(123), + minKey: 10, + maxKey: 100, + uncompressedSize: 2048, + count: 50, + keysEncodeType: 1, + dataBlock: dataBlock{offset: 1000, size: 2048}, + keysBlock: dataBlock{offset: 3048, size: 512}, + tagsBlocks: map[string]dataBlock{ + "service_id": {offset: 3560, size: 256}, + "endpoint": {offset: 3816, size: 512}, + "status": {offset: 4328, size: 128}, + }, + tagProjection: []string{"service_id", "endpoint", "status"}, + }, + description: "should copy all fields from source to target", + }, + { + name: "copy to target with existing data", + source: &blockMetadata{ + seriesID: common.SeriesID(456), + minKey: 20, + maxKey: 200, + uncompressedSize: 4096, + count: 100, + keysEncodeType: 2, + dataBlock: dataBlock{offset: 2000, size: 4096}, + keysBlock: dataBlock{offset: 6096, size: 1024}, + tagsBlocks: map[string]dataBlock{ + "new_tag": {offset: 7120, size: 512}, + }, + tagProjection: []string{"new_tag"}, + }, + target: &blockMetadata{ + seriesID: common.SeriesID(999), + minKey: 999, + maxKey: 9999, + uncompressedSize: 9999, + count: 9999, + keysEncodeType: 99, + dataBlock: dataBlock{offset: 9999, size: 9999}, + keysBlock: dataBlock{offset: 9999, size: 9999}, + tagsBlocks: map[string]dataBlock{ + "old_tag": {offset: 9999, size: 9999}, + }, + tagProjection: []string{"old_tag"}, + }, + expectedResult: &blockMetadata{ + seriesID: common.SeriesID(456), + minKey: 20, + maxKey: 200, + uncompressedSize: 4096, + count: 100, + keysEncodeType: 2, + dataBlock: dataBlock{offset: 2000, size: 4096}, + keysBlock: dataBlock{offset: 6096, size: 1024}, + tagsBlocks: map[string]dataBlock{ + "new_tag": {offset: 7120, size: 512}, + }, + tagProjection: []string{"new_tag"}, + }, + description: "should overwrite all existing data in target", + }, + { + name: "copy empty metadata", + source: &blockMetadata{}, + target: &blockMetadata{}, + expectedResult: &blockMetadata{ + tagsBlocks: map[string]dataBlock{}, + tagProjection: []string{}, + }, + description: "should handle empty source metadata", + }, + { + name: "copy with nil tagsBlocks in target", + source: &blockMetadata{ + seriesID: common.SeriesID(789), + minKey: 30, + maxKey: 300, + uncompressedSize: 1024, + count: 25, + keysEncodeType: 0, + dataBlock: dataBlock{offset: 3000, size: 1024}, + keysBlock: dataBlock{offset: 4024, size: 256}, + tagsBlocks: map[string]dataBlock{ + "tag1": {offset: 4280, size: 128}, + }, + tagProjection: []string{"tag1"}, + }, + target: &blockMetadata{ + tagsBlocks: nil, // nil map + }, + expectedResult: &blockMetadata{ + seriesID: common.SeriesID(789), + minKey: 30, + maxKey: 300, + uncompressedSize: 1024, + count: 25, + keysEncodeType: 0, + dataBlock: dataBlock{offset: 3000, size: 1024}, + keysBlock: dataBlock{offset: 4024, size: 256}, + tagsBlocks: map[string]dataBlock{ + "tag1": {offset: 4280, size: 128}, + }, + tagProjection: []string{"tag1"}, + }, + description: "should initialize nil tagsBlocks map in target", + }, + { + name: "copy with empty tagsBlocks in target", + source: &blockMetadata{ + seriesID: common.SeriesID(111), + minKey: 40, + maxKey: 400, + uncompressedSize: 512, + count: 10, + keysEncodeType: 3, + dataBlock: dataBlock{offset: 4000, size: 512}, + keysBlock: dataBlock{offset: 4512, size: 128}, + tagsBlocks: map[string]dataBlock{}, + tagProjection: []string{}, + }, + target: &blockMetadata{ + tagsBlocks: map[string]dataBlock{ + "old_tag1": {offset: 9999, size: 9999}, + "old_tag2": {offset: 9999, size: 9999}, + }, + tagProjection: []string{"old_tag1", "old_tag2"}, + }, + expectedResult: &blockMetadata{ + seriesID: common.SeriesID(111), + minKey: 40, + maxKey: 400, + uncompressedSize: 512, + count: 10, + keysEncodeType: 3, + dataBlock: dataBlock{offset: 4000, size: 512}, + keysBlock: dataBlock{offset: 4512, size: 128}, + tagsBlocks: map[string]dataBlock{}, + tagProjection: []string{}, + }, + description: "should clear existing tagsBlocks and tagProjection in target", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create a copy of the target to avoid modifying the original test data + targetCopy := &blockMetadata{} + if tt.target.tagsBlocks != nil { + targetCopy.tagsBlocks = make(map[string]dataBlock) + for k, v := range tt.target.tagsBlocks { + targetCopy.tagsBlocks[k] = v + } + } + if tt.target.tagProjection != nil { + targetCopy.tagProjection = make([]string, len(tt.target.tagProjection)) + copy(targetCopy.tagProjection, tt.target.tagProjection) + } + targetCopy.seriesID = tt.target.seriesID + targetCopy.minKey = tt.target.minKey + targetCopy.maxKey = tt.target.maxKey + targetCopy.uncompressedSize = tt.target.uncompressedSize + targetCopy.count = tt.target.count + targetCopy.keysEncodeType = tt.target.keysEncodeType + targetCopy.dataBlock = tt.target.dataBlock + targetCopy.keysBlock = tt.target.keysBlock + + // Execute copyFrom + targetCopy.copyFrom(tt.source) + + // Verify all fields are copied correctly + assert.Equal(t, tt.expectedResult.seriesID, targetCopy.seriesID, "seriesID mismatch") + assert.Equal(t, tt.expectedResult.minKey, targetCopy.minKey, "minKey mismatch") + assert.Equal(t, tt.expectedResult.maxKey, targetCopy.maxKey, "maxKey mismatch") + assert.Equal(t, tt.expectedResult.uncompressedSize, targetCopy.uncompressedSize, "uncompressedSize mismatch") + assert.Equal(t, tt.expectedResult.count, targetCopy.count, "count mismatch") + assert.Equal(t, tt.expectedResult.keysEncodeType, targetCopy.keysEncodeType, "keysEncodeType mismatch") + assert.Equal(t, tt.expectedResult.dataBlock, targetCopy.dataBlock, "dataBlock mismatch") + assert.Equal(t, tt.expectedResult.keysBlock, targetCopy.keysBlock, "keysBlock mismatch") + + // Verify tagsBlocks + if tt.expectedResult.tagsBlocks == nil { + assert.Nil(t, targetCopy.tagsBlocks, "tagsBlocks should be nil") + } else { + assert.NotNil(t, targetCopy.tagsBlocks, "tagsBlocks should not be nil") + assert.Equal(t, len(tt.expectedResult.tagsBlocks), len(targetCopy.tagsBlocks), "tagsBlocks length mismatch") + for k, v := range tt.expectedResult.tagsBlocks { + assert.Equal(t, v, targetCopy.tagsBlocks[k], "tagsBlocks[%s] mismatch", k) + } + } + + // Verify tagProjection + assert.Equal(t, len(tt.expectedResult.tagProjection), len(targetCopy.tagProjection), "tagProjection length mismatch") + for i, v := range tt.expectedResult.tagProjection { + assert.Equal(t, v, targetCopy.tagProjection[i], "tagProjection[%d] mismatch", i) + } + }) + } +} + +func TestBlockMetadata_CopyFrom_DeepCopy(t *testing.T) { + // Test that copyFrom creates a deep copy, not just references + source := &blockMetadata{ + seriesID: common.SeriesID(123), + minKey: 10, + maxKey: 100, + uncompressedSize: 2048, + count: 50, + keysEncodeType: 1, + dataBlock: dataBlock{offset: 1000, size: 2048}, + keysBlock: dataBlock{offset: 3048, size: 512}, + tagsBlocks: map[string]dataBlock{ + "tag1": {offset: 3560, size: 256}, + }, + tagProjection: []string{"tag1"}, + } + + target := &blockMetadata{} + target.copyFrom(source) + + // Verify it's a deep copy by modifying the source + source.seriesID = common.SeriesID(999) + source.minKey = 999 + source.maxKey = 9999 + source.uncompressedSize = 9999 + source.count = 9999 + source.keysEncodeType = 99 + source.dataBlock = dataBlock{offset: 9999, size: 9999} + source.keysBlock = dataBlock{offset: 9999, size: 9999} + source.tagsBlocks["tag1"] = dataBlock{offset: 9999, size: 9999} + source.tagProjection[0] = "modified_tag" + + // Target should remain unchanged + assert.Equal(t, common.SeriesID(123), target.seriesID, "target seriesID should not change when source is modified") + assert.Equal(t, int64(10), target.minKey, "target minKey should not change when source is modified") + assert.Equal(t, int64(100), target.maxKey, "target maxKey should not change when source is modified") + assert.Equal(t, uint64(2048), target.uncompressedSize, "target uncompressedSize should not change when source is modified") + assert.Equal(t, uint64(50), target.count, "target count should not change when source is modified") + assert.Equal(t, encoding.EncodeType(1), target.keysEncodeType, "target keysEncodeType should not change when source is modified") + assert.Equal(t, dataBlock{offset: 1000, size: 2048}, target.dataBlock, "target dataBlock should not change when source is modified") + assert.Equal(t, dataBlock{offset: 3048, size: 512}, target.keysBlock, "target keysBlock should not change when source is modified") + assert.Equal(t, dataBlock{offset: 3560, size: 256}, target.tagsBlocks["tag1"], "target tagsBlocks should not change when source is modified") + assert.Equal(t, "tag1", target.tagProjection[0], "target tagProjection should not change when source is modified") +} + +func TestBlockMetadata_CopyFrom_EdgeCases(t *testing.T) { + t.Run("copy from nil source", func(t *testing.T) { + target := &blockMetadata{ + seriesID: 123, + minKey: 10, + maxKey: 100, + } + + // This should not panic and should leave target unchanged + assert.NotPanics(t, func() { + target.copyFrom(nil) + }) + + // Target should remain unchanged + assert.Equal(t, common.SeriesID(123), target.seriesID) + assert.Equal(t, int64(10), target.minKey) + assert.Equal(t, int64(100), target.maxKey) + }) + + t.Run("copy to nil target", func(t *testing.T) { + source := &blockMetadata{ + seriesID: 456, + minKey: 20, + maxKey: 200, + } + + // This should panic + assert.Panics(t, func() { + var target *blockMetadata + target.copyFrom(source) + }) + }) + + t.Run("copy with very large values", func(t *testing.T) { + source := &blockMetadata{ + seriesID: common.SeriesID(^uint64(0)), // Max uint64 + minKey: ^int64(0), // Min int64 + maxKey: ^int64(0) >> 1, // Max int64 + uncompressedSize: ^uint64(0), // Max uint64 + count: ^uint64(0), // Max uint64 + keysEncodeType: encoding.EncodeType(^uint8(0)), // Max uint8 + dataBlock: dataBlock{offset: ^uint64(0), size: ^uint64(0)}, + keysBlock: dataBlock{offset: ^uint64(0), size: ^uint64(0)}, + tagsBlocks: map[string]dataBlock{ + "large_tag": {offset: ^uint64(0), size: ^uint64(0)}, + }, + tagProjection: []string{"large_tag"}, + } + + target := &blockMetadata{} + target.copyFrom(source) + + // Verify all large values are copied correctly + assert.Equal(t, source.seriesID, target.seriesID) + assert.Equal(t, source.minKey, target.minKey) + assert.Equal(t, source.maxKey, target.maxKey) + assert.Equal(t, source.uncompressedSize, target.uncompressedSize) + assert.Equal(t, source.count, target.count) + assert.Equal(t, source.keysEncodeType, target.keysEncodeType) + assert.Equal(t, source.dataBlock, target.dataBlock) + assert.Equal(t, source.keysBlock, target.keysBlock) + assert.Equal(t, source.tagsBlocks, target.tagsBlocks) + assert.Equal(t, source.tagProjection, target.tagProjection) + }) +} diff --git a/banyand/internal/sidx/part_iter.go b/banyand/internal/sidx/part_iter.go index 3c0cb801..946a1a6c 100644 --- a/banyand/internal/sidx/part_iter.go +++ b/banyand/internal/sidx/part_iter.go @@ -21,7 +21,6 @@ import ( "errors" "fmt" "io" - "maps" "sort" "github.com/apache/skywalking-banyandb/api/common" @@ -259,24 +258,3 @@ func (pi *partIter) findBlock() bool { pi.bms = nil return false } - -func (bm *blockMetadata) copyFrom(other *blockMetadata) { - bm.seriesID = other.seriesID - bm.minKey = other.minKey - bm.maxKey = other.maxKey - bm.count = other.count - bm.uncompressedSize = other.uncompressedSize - bm.dataBlock = other.dataBlock - bm.keysBlock = other.keysBlock - - // Copy tag blocks - if bm.tagsBlocks == nil { - bm.tagsBlocks = make(map[string]dataBlock) - } - clear(bm.tagsBlocks) - maps.Copy(bm.tagsBlocks, other.tagsBlocks) - - // Copy tag projection - bm.tagProjection = bm.tagProjection[:0] - bm.tagProjection = append(bm.tagProjection, other.tagProjection...) -} diff --git a/banyand/internal/sidx/query_result.go b/banyand/internal/sidx/query_result.go index 0d25bf1d..c5e5d10f 100644 --- a/banyand/internal/sidx/query_result.go +++ b/banyand/internal/sidx/query_result.go @@ -136,7 +136,10 @@ func (qr *queryResult) processWorkerBatches(workerID int, batchCh chan *blockSca } for _, bs := range batch.bss { - qr.loadAndProcessBlock(tmpBlock, bs, qr.shards[workerID]) + if !qr.loadAndProcessBlock(tmpBlock, bs, qr.shards[workerID]) { + // If load fails, continue with next block rather than stopping + continue + } } releaseBlockScanResultBatch(batch) @@ -168,6 +171,11 @@ func (qr *queryResult) loadBlockData(tmpBlock *block, p *part, bm *blockMetadata return false } + // Check if readers are properly initialized + if p.keys == nil || p.data == nil { + return false + } + // Read and decompress user keys (always needed) compressedKeysBuf := make([]byte, bm.keysBlock.size) _, err := p.keys.Read(int64(bm.keysBlock.offset), compressedKeysBuf) @@ -316,8 +324,8 @@ func (qr *queryResult) convertBlockToResponse(block *block, seriesID common.Seri elemCount := len(block.userKeys) for i := 0; i < elemCount; i++ { - // Apply MaxElementSize limit from request - if result.Len() >= qr.request.MaxElementSize { + // Apply MaxElementSize limit from request (only if positive) + if qr.request.MaxElementSize > 0 && result.Len() >= qr.request.MaxElementSize { break } @@ -548,7 +556,7 @@ func (qrh *QueryResponseHeap) mergeWithHeap(limit int) *QueryResponse { result.Tags = append(result.Tags, resp.Tags[idx]) result.SIDs = append(result.SIDs, resp.SIDs[idx]) - if result.Len() >= limit { + if limit > 0 && result.Len() >= limit { break } diff --git a/banyand/internal/sidx/query_result_test.go b/banyand/internal/sidx/query_result_test.go index 3c38f8ca..c09e86d7 100644 --- a/banyand/internal/sidx/query_result_test.go +++ b/banyand/internal/sidx/query_result_test.go @@ -19,10 +19,14 @@ package sidx import ( "container/heap" + "context" "testing" "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/banyand/internal/test" + "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/query/model" ) func TestQueryResponseHeap_BasicOperations(t *testing.T) { @@ -379,11 +383,10 @@ func TestQueryResponseHeap_EdgeCases(t *testing.T) { result := qrh.mergeWithHeap(0) - // NOTE: Current implementation has a bug - it copies one element before checking limit - // For limit=0, it should return empty result, but currently returns 1 element - // This test documents the current behavior - if result.Len() != 1 { - t.Errorf("Current implementation with zero limit returns 1 element (bug), got %d", result.Len()) + // With limit=0, should return all elements since limit=0 means no limit + expectedLen := 3 + if result.Len() != expectedLen { + t.Errorf("With zero limit (no limit), expected %d elements, got %d", expectedLen, result.Len()) } if result.Len() > 0 && result.Keys[0] != 1 { t.Errorf("Expected first key to be 1, got %d", result.Keys[0]) @@ -640,3 +643,796 @@ func createBenchmarkResponseDesc(size int, offset, step int64) *QueryResponse { return response } + +// Tests for queryResult struct using memPart datasets + +func TestQueryResult_Pull_SingleMemPart(t *testing.T) { + // Create test dataset with multiple series + expectedElements := []testElement{ + {seriesID: 1, userKey: 100, data: []byte("data1"), tags: []tag{{name: "service", value: []byte("service1"), valueType: pbv1.ValueTypeStr, indexed: true}}}, + {seriesID: 2, userKey: 200, data: []byte("data2"), tags: []tag{{name: "service", value: []byte("service2"), valueType: pbv1.ValueTypeStr, indexed: true}}}, + {seriesID: 3, userKey: 300, data: []byte("data3"), tags: []tag{{name: "environment", value: []byte("prod"), valueType: pbv1.ValueTypeStr, indexed: true}}}, + } + + elements := createTestElements(expectedElements) + defer releaseElements(elements) + + // Create memory part from elements + mp := generateMemPart() + defer releaseMemPart(mp) + mp.mustInitFromElements(elements) + + // Create part from memory part + testPart := openMemPart(mp) + defer testPart.close() + + // Create snapshot and parts + parts := []*part{testPart} + snap := &snapshot{} + snap.parts = make([]*partWrapper, len(parts)) + for i, p := range parts { + snap.parts[i] = newPartWrapper(p) + } + // Initialize reference count to 1 + snap.ref = 1 + + // Create mock memory protector + mockProtector := &test.MockMemoryProtector{ExpectQuotaExceeded: false} + + // Create query request with default MaxElementSize (0) to test the fix + req := QueryRequest{ + SeriesIDs: []common.SeriesID{1, 2, 3}, + MinKey: nil, + MaxKey: nil, + Filter: nil, + Order: nil, + MaxElementSize: 0, // Test with default value (0) to ensure fix works + } + + // Debug: log the request parameters + t.Logf("Request MaxElementSize: %d", req.MaxElementSize) + t.Logf("Element keys: %v", []int64{100, 200, 300}) + t.Logf("Key range filter: [%d, %d]", 50, 400) + + // Create block scanner + bs := &blockScanner{ + pm: mockProtector, + filter: req.Filter, + l: logger.GetLogger("test"), + parts: parts, + seriesIDs: req.SeriesIDs, + minKey: 50, + maxKey: 400, + asc: true, + } + + // Create query result + qr := &queryResult{ + request: req, + ctx: context.Background(), + pm: mockProtector, + snapshot: snap, + bs: bs, + l: logger.GetLogger("test-queryResult"), + parts: parts, + asc: true, + released: false, + } + + // Debug: Test if the iterator can find blocks in this memPart + bma := generateBlockMetadataArray() + defer releaseBlockMetadataArray(bma) + + it := generateIter() + defer releaseIter(it) + + it.init(bma, parts, req.SeriesIDs, 50, 400, nil) + + blockCount := 0 + for it.nextBlock() { + blockCount++ + } + t.Logf("Iterator found %d blocks", blockCount) + if it.Error() != nil { + t.Logf("Iterator error: %v", it.Error()) + } + + // Pull the response + response := qr.Pull() + + // Debug: check if response is nil first + t.Logf("Response is nil: %v", response == nil) + + // Verify response is not nil + if response == nil { + t.Fatal("QueryResult.Pull() returned nil response") + } + + // Check for errors in response + if response.Error != nil { + t.Fatalf("QueryResult returned error: %v", response.Error) + } + + // Verify response has the expected number of elements + if response.Len() != len(expectedElements) { + t.Fatalf("Expected %d elements in response, got %d", len(expectedElements), response.Len()) + } + + // Verify data consistency + err := response.Validate() + if err != nil { + t.Fatalf("Response validation failed: %v", err) + } + + // Create a map of expected data by key for easy lookup + expectedByKey := make(map[int64]testElement) + for _, elem := range expectedElements { + expectedByKey[elem.userKey] = elem + } + + // Verify each returned element matches expected data + for i := 0; i < response.Len(); i++ { + key := response.Keys[i] + data := response.Data[i] + sid := response.SIDs[i] + tags := response.Tags[i] + + // Find expected element by key + expected, found := expectedByKey[key] + if !found { + t.Errorf("Unexpected key %d found in response at position %d", key, i) + continue + } + + // Verify series ID matches + if sid != expected.seriesID { + t.Errorf("At position %d: expected seriesID %d, got %d", i, expected.seriesID, sid) + } + + // Verify data matches + if string(data) != string(expected.data) { + t.Errorf("At position %d: expected data %s, got %s", i, string(expected.data), string(data)) + } + + // Verify tags match + if len(tags) != len(expected.tags) { + t.Errorf("At position %d: expected %d tags, got %d", i, len(expected.tags), len(tags)) + continue + } + + for j, tag := range tags { + expectedTag := expected.tags[j] + if tag.name != expectedTag.name { + t.Errorf("At position %d, tag %d: expected name %s, got %s", i, j, expectedTag.name, tag.name) + } + if string(tag.value) != string(expectedTag.value) { + t.Errorf("At position %d, tag %d: expected value %s, got %s", i, j, string(expectedTag.value), string(tag.value)) + } + if tag.valueType != expectedTag.valueType { + t.Errorf("At position %d, tag %d: expected valueType %v, got %v", i, j, expectedTag.valueType, tag.valueType) + } + } + } + + // Verify keys are within expected range and properly sorted + for i, key := range response.Keys { + if key < 50 || key > 400 { + t.Errorf("Key %d at position %d is outside expected range [50, 400]", key, i) + } + if i > 0 && key <= response.Keys[i-1] { + t.Errorf("Keys not properly sorted: key %d at position %d should be greater than previous key %d", key, i, response.Keys[i-1]) + } + } + + // Clean up + qr.Release() +} + +func TestQueryResult_Pull_MultipleMemParts(t *testing.T) { + // Create multiple test datasets with expected elements + expectedElements := []testElement{ + {seriesID: 1, userKey: 100, data: []byte("data1_part1"), tags: []tag{{name: "service", value: []byte("service1"), valueType: pbv1.ValueTypeStr, indexed: true}}}, + {seriesID: 2, userKey: 200, data: []byte("data2_part2"), tags: []tag{{name: "service", value: []byte("service2"), valueType: pbv1.ValueTypeStr, indexed: true}}}, + {seriesID: 3, userKey: 300, data: []byte("data3_part1"), tags: []tag{{name: "service", value: []byte("service3"), valueType: pbv1.ValueTypeStr, indexed: true}}}, + {seriesID: 4, userKey: 400, data: []byte("data4_part2"), tags: []tag{{name: "environment", value: []byte("test"), valueType: pbv1.ValueTypeStr, indexed: true}}}, + } + + // Create test elements for each part + elements1 := createTestElements([]testElement{expectedElements[0], expectedElements[2]}) // data1_part1, data3_part1 + defer releaseElements(elements1) + + elements2 := createTestElements([]testElement{expectedElements[1], expectedElements[3]}) // data2_part2, data4_part2 + defer releaseElements(elements2) + + // Create memory parts + mp1 := generateMemPart() + defer releaseMemPart(mp1) + mp1.mustInitFromElements(elements1) + + mp2 := generateMemPart() + defer releaseMemPart(mp2) + mp2.mustInitFromElements(elements2) + + // Create parts from memory parts + testPart1 := openMemPart(mp1) + defer testPart1.close() + testPart2 := openMemPart(mp2) + defer testPart2.close() + + // Create snapshot with multiple parts + parts := []*part{testPart1, testPart2} + snap := &snapshot{} + snap.parts = make([]*partWrapper, len(parts)) + for i, p := range parts { + snap.parts[i] = newPartWrapper(p) + } + // Initialize reference count to 1 + snap.ref = 1 + + // Create mock memory protector + mockProtector := &test.MockMemoryProtector{ExpectQuotaExceeded: false} + + // Create query request targeting all series + req := QueryRequest{ + SeriesIDs: []common.SeriesID{1, 2, 3, 4}, + MinKey: nil, + MaxKey: nil, + Filter: nil, + Order: nil, + MaxElementSize: 0, // Test with default value (0) to ensure fix works + } + + // Create block scanner + bs := &blockScanner{ + pm: mockProtector, + filter: req.Filter, + l: logger.GetLogger("test"), + parts: parts, + seriesIDs: req.SeriesIDs, + minKey: 50, + maxKey: 500, + asc: true, + } + + // Create query result + qr := &queryResult{ + request: req, + ctx: context.Background(), + pm: mockProtector, + snapshot: snap, + bs: bs, + l: logger.GetLogger("test-queryResult"), + parts: parts, + asc: true, + released: false, + } + + // Pull the response + response := qr.Pull() + + // Verify response is not nil + if response == nil { + t.Fatal("QueryResult.Pull() returned nil response") + } + + // Check for errors in response + if response.Error != nil { + t.Fatalf("QueryResult returned error: %v", response.Error) + } + + // Verify response has the expected number of elements + if response.Len() != len(expectedElements) { + t.Fatalf("Expected %d elements in response, got %d", len(expectedElements), response.Len()) + } + + // Verify data consistency + err := response.Validate() + if err != nil { + t.Fatalf("Response validation failed: %v", err) + } + + // Create a map of expected data by key for easy lookup + expectedByKey := make(map[int64]testElement) + for _, elem := range expectedElements { + expectedByKey[elem.userKey] = elem + } + + // Verify each returned element matches expected data + for i := 0; i < response.Len(); i++ { + key := response.Keys[i] + data := response.Data[i] + sid := response.SIDs[i] + tags := response.Tags[i] + + // Find expected element by key + expected, found := expectedByKey[key] + if !found { + t.Errorf("Unexpected key %d found in response at position %d", key, i) + continue + } + + // Verify series ID matches + if sid != expected.seriesID { + t.Errorf("At position %d: expected seriesID %d, got %d", i, expected.seriesID, sid) + } + + // Verify data matches + if string(data) != string(expected.data) { + t.Errorf("At position %d: expected data %s, got %s", i, string(expected.data), string(data)) + } + + // Verify tags match + if len(tags) != len(expected.tags) { + t.Errorf("At position %d: expected %d tags, got %d", i, len(expected.tags), len(tags)) + continue + } + + for j, tag := range tags { + expectedTag := expected.tags[j] + if tag.name != expectedTag.name { + t.Errorf("At position %d, tag %d: expected name %s, got %s", i, j, expectedTag.name, tag.name) + } + if string(tag.value) != string(expectedTag.value) { + t.Errorf("At position %d, tag %d: expected value %s, got %s", i, j, string(expectedTag.value), string(tag.value)) + } + if tag.valueType != expectedTag.valueType { + t.Errorf("At position %d, tag %d: expected valueType %v, got %v", i, j, expectedTag.valueType, tag.valueType) + } + } + } + + // Verify keys are within expected range and properly sorted + for i, key := range response.Keys { + if key < 50 || key > 500 { + t.Errorf("Key %d at position %d is outside expected range [50, 500]", key, i) + } + if i > 0 && key <= response.Keys[i-1] { + t.Errorf("Keys not properly sorted: key %d at position %d should be greater than previous key %d", key, i, response.Keys[i-1]) + } + } + + // Clean up + qr.Release() +} + +func TestQueryResult_Pull_WithTagProjection(t *testing.T) { + // Create test dataset with multiple tags + elements := createTestElements([]testElement{ + { + seriesID: 1, userKey: 100, data: []byte("data1"), + tags: []tag{ + {name: "service", value: []byte("service1"), valueType: pbv1.ValueTypeStr, indexed: true}, + {name: "environment", value: []byte("prod"), valueType: pbv1.ValueTypeStr, indexed: true}, + {name: "region", value: []byte("us-west"), valueType: pbv1.ValueTypeStr, indexed: true}, + }, + }, + { + seriesID: 2, userKey: 200, data: []byte("data2"), + tags: []tag{ + {name: "service", value: []byte("service2"), valueType: pbv1.ValueTypeStr, indexed: true}, + {name: "environment", value: []byte("test"), valueType: pbv1.ValueTypeStr, indexed: true}, + {name: "region", value: []byte("us-east"), valueType: pbv1.ValueTypeStr, indexed: true}, + }, + }, + }) + defer releaseElements(elements) + + // Create memory part + mp := generateMemPart() + defer releaseMemPart(mp) + mp.mustInitFromElements(elements) + + testPart := openMemPart(mp) + defer testPart.close() + + // Create snapshot + parts := []*part{testPart} + snap := &snapshot{} + snap.parts = make([]*partWrapper, len(parts)) + for i, p := range parts { + snap.parts[i] = newPartWrapper(p) + } + // Initialize reference count to 1 + snap.ref = 1 + + mockProtector := &test.MockMemoryProtector{ExpectQuotaExceeded: false} + + // Create query request with tag projection (only load "service" and "environment") + req := QueryRequest{ + SeriesIDs: []common.SeriesID{1, 2}, + TagProjection: []model.TagProjection{ + {Names: []string{"service", "environment"}}, + }, + MinKey: nil, + MaxKey: nil, + Filter: nil, + } + + bs := &blockScanner{ + pm: mockProtector, + filter: req.Filter, + l: logger.GetLogger("test"), + parts: parts, + seriesIDs: req.SeriesIDs, + minKey: 50, + maxKey: 300, + asc: true, + } + + qr := &queryResult{ + request: req, + ctx: context.Background(), + pm: mockProtector, + snapshot: snap, + bs: bs, + l: logger.GetLogger("test-queryResult"), + parts: parts, + asc: true, + released: false, + } + + // Test Pull with tag projection + response := qr.Pull() + + if response == nil { + t.Fatal("Expected non-nil response") + } + if response.Error != nil { + t.Fatalf("Expected no error, got: %v", response.Error) + } + + // Verify response has data + if response.Len() == 0 { + t.Error("Expected non-empty response") + } + + // Verify tag projection worked (should only have projected tags) + // Note: The actual tag filtering logic needs to be verified based on implementation + for i, tagGroup := range response.Tags { + for _, tag := range tagGroup { + if tag.name != "service" && tag.name != "environment" { + t.Errorf("Unexpected tag '%s' found at position %d, should only have projected tags", tag.name, i) + } + } + } + + qr.Release() +} + +func TestQueryResult_Pull_DescendingOrder(t *testing.T) { + // Create test dataset + elements := createTestElements([]testElement{ + {seriesID: 1, userKey: 100, data: []byte("data1"), tags: []tag{{name: "service", value: []byte("service1"), valueType: pbv1.ValueTypeStr, indexed: true}}}, + {seriesID: 1, userKey: 200, data: []byte("data2"), tags: []tag{{name: "service", value: []byte("service1"), valueType: pbv1.ValueTypeStr, indexed: true}}}, + {seriesID: 1, userKey: 300, data: []byte("data3"), tags: []tag{{name: "service", value: []byte("service1"), valueType: pbv1.ValueTypeStr, indexed: true}}}, + }) + defer releaseElements(elements) + + mp := generateMemPart() + defer releaseMemPart(mp) + mp.mustInitFromElements(elements) + + testPart := openMemPart(mp) + defer testPart.close() + + parts := []*part{testPart} + snap := &snapshot{} + snap.parts = make([]*partWrapper, len(parts)) + for i, p := range parts { + snap.parts[i] = newPartWrapper(p) + } + // Initialize reference count to 1 + snap.ref = 1 + + mockProtector := &test.MockMemoryProtector{ExpectQuotaExceeded: false} + + req := QueryRequest{ + SeriesIDs: []common.SeriesID{1}, + MinKey: nil, + MaxKey: nil, + Filter: nil, + } + + // Create descending order scanner + bs := &blockScanner{ + pm: mockProtector, + filter: req.Filter, + l: logger.GetLogger("test"), + parts: parts, + seriesIDs: req.SeriesIDs, + minKey: 50, + maxKey: 400, + asc: false, // Descending order + } + + qr := &queryResult{ + request: req, + ctx: context.Background(), + pm: mockProtector, + snapshot: snap, + bs: bs, + l: logger.GetLogger("test-queryResult"), + parts: parts, + asc: false, // Descending order + released: false, + } + + response := qr.Pull() + + if response == nil { + t.Fatal("Expected non-nil response") + } + if response.Error != nil { + t.Fatalf("Expected no error, got: %v", response.Error) + } + + // Verify descending order + for i := 1; i < len(response.Keys); i++ { + if response.Keys[i] > response.Keys[i-1] { + t.Errorf("Keys are not in descending order: %d > %d at positions %d, %d", + response.Keys[i], response.Keys[i-1], i, i-1) + } + } + + qr.Release() +} + +func TestQueryResult_Pull_ErrorHandling(t *testing.T) { + // Test with quota exceeded scenario + mockProtector := &test.MockMemoryProtector{ExpectQuotaExceeded: true} + + elements := createTestElements([]testElement{ + {seriesID: 1, userKey: 100, data: []byte("data1"), tags: []tag{{name: "service", value: []byte("service1"), valueType: pbv1.ValueTypeStr, indexed: true}}}, + }) + defer releaseElements(elements) + + mp := generateMemPart() + defer releaseMemPart(mp) + mp.mustInitFromElements(elements) + + testPart := openMemPart(mp) + defer testPart.close() + + parts := []*part{testPart} + snap := &snapshot{} + snap.parts = make([]*partWrapper, len(parts)) + for i, p := range parts { + snap.parts[i] = newPartWrapper(p) + } + // Initialize reference count to 1 + snap.ref = 1 + + req := QueryRequest{ + SeriesIDs: []common.SeriesID{1}, + } + + bs := &blockScanner{ + pm: mockProtector, + filter: req.Filter, + l: logger.GetLogger("test"), + parts: parts, + seriesIDs: req.SeriesIDs, + minKey: 50, + maxKey: 200, + asc: true, + } + + qr := &queryResult{ + request: req, + ctx: context.Background(), + pm: mockProtector, + snapshot: snap, + bs: bs, + l: logger.GetLogger("test-queryResult"), + parts: parts, + asc: true, + released: false, + } + + response := qr.Pull() + + // With quota exceeded, we expect either an error response or a response with an error + if response != nil && response.Error != nil { + t.Logf("Expected error due to quota exceeded: %v", response.Error) + } + + qr.Release() +} + +func TestQueryResult_Release(t *testing.T) { + elements := createTestElements([]testElement{ + {seriesID: 1, userKey: 100, data: []byte("data1"), tags: []tag{{name: "service", value: []byte("service1"), valueType: pbv1.ValueTypeStr, indexed: true}}}, + }) + defer releaseElements(elements) + + mp := generateMemPart() + defer releaseMemPart(mp) + mp.mustInitFromElements(elements) + + testPart := openMemPart(mp) + defer testPart.close() + + parts := []*part{testPart} + snap := &snapshot{} + snap.parts = make([]*partWrapper, len(parts)) + for i, p := range parts { + snap.parts[i] = newPartWrapper(p) + } + // Initialize reference count to 1 + snap.ref = 1 + + // Create a mock blockScanner to verify it gets closed + mockBS := &blockScanner{ + pm: &test.MockMemoryProtector{}, + filter: nil, + l: logger.GetLogger("test"), + parts: parts, + seriesIDs: []common.SeriesID{1}, + minKey: 50, + maxKey: 200, + asc: true, + } + + // Create a mock protector + mockProtector := &test.MockMemoryProtector{} + + qr := &queryResult{ + request: QueryRequest{SeriesIDs: []common.SeriesID{1}}, + ctx: context.Background(), + pm: mockProtector, + snapshot: snap, + bs: mockBS, + l: logger.GetLogger("test-queryResult"), + tagsToLoad: map[string]struct{}{"service": {}}, + parts: parts, + shards: []*QueryResponse{}, + asc: true, + released: false, + } + + // Verify initial state + if qr.released { + t.Error("queryResult should not be released initially") + } + if qr.snapshot == nil { + t.Error("queryResult should have a snapshot initially") + } + if qr.bs == nil { + t.Error("queryResult should have a blockScanner initially") + } + if qr.parts == nil { + t.Error("queryResult should have parts initially") + } + if qr.tagsToLoad == nil { + t.Error("queryResult should have tagsToLoad initially") + } + if qr.shards == nil { + t.Error("queryResult should have shards initially") + } + + // Store initial reference count for verification + initialRef := snap.ref + + // Release + qr.Release() + + // Verify released flag is set + if !qr.released { + t.Error("queryResult should be marked as released") + } + + // Verify snapshot is nullified and reference count is decremented + if qr.snapshot != nil { + t.Error("queryResult.snapshot should be nullified after release") + } + if snap.ref != initialRef-1 { + t.Errorf("snapshot reference count should be decremented from %d to %d, got %d", initialRef, initialRef-1, snap.ref) + } + + // Verify blockScanner is closed but not nullified (it gets closed in Release method but not set to nil) + if qr.bs == nil { + t.Error("queryResult.bs should not be nullified after release, only closed") + } + // Note: We can't easily verify that bs.close() was called without exposing internal state + // The important thing is that the blockScanner is still accessible for potential cleanup + + // Verify other fields remain unchanged (they don't get nullified) + if qr.parts == nil { + t.Error("queryResult.parts should remain unchanged after release") + } + if qr.tagsToLoad == nil { + t.Error("queryResult.tagsToLoad should remain unchanged after release") + } + if qr.shards == nil { + t.Error("queryResult.shards should remain unchanged after release") + } + if qr.ctx == nil { + t.Error("queryResult.ctx should remain unchanged after release") + } + if qr.pm == nil { + t.Error("queryResult.pm should remain unchanged after release") + } + if qr.l == nil { + t.Error("queryResult.l should remain unchanged after release") + } + + // Verify that subsequent calls to Release are safe (idempotent) + qr.Release() + if !qr.released { + t.Error("queryResult should remain released after second release call") + } + if qr.snapshot != nil { + t.Error("queryResult.snapshot should remain nullified after second release call") + } + if qr.bs == nil { + t.Error("queryResult.bs should remain accessible after second release call") + } +} + +func TestQueryResult_Pull_ContextCancellation(t *testing.T) { + elements := createTestElements([]testElement{ + {seriesID: 1, userKey: 100, data: []byte("data1"), tags: []tag{{name: "service", value: []byte("service1"), valueType: pbv1.ValueTypeStr, indexed: true}}}, + }) + defer releaseElements(elements) + + mp := generateMemPart() + defer releaseMemPart(mp) + mp.mustInitFromElements(elements) + + testPart := openMemPart(mp) + defer testPart.close() + + parts := []*part{testPart} + snap := &snapshot{} + snap.parts = make([]*partWrapper, len(parts)) + for i, p := range parts { + snap.parts[i] = newPartWrapper(p) + } + // Initialize reference count to 1 + snap.ref = 1 + + mockProtector := &test.MockMemoryProtector{ExpectQuotaExceeded: false} + + // Create cancelled context + ctx, cancel := context.WithCancel(context.Background()) + cancel() // Cancel immediately + + req := QueryRequest{ + SeriesIDs: []common.SeriesID{1}, + } + + bs := &blockScanner{ + pm: mockProtector, + filter: req.Filter, + l: logger.GetLogger("test"), + parts: parts, + seriesIDs: req.SeriesIDs, + minKey: 50, + maxKey: 200, + asc: true, + } + + qr := &queryResult{ + request: req, + ctx: ctx, + pm: mockProtector, + snapshot: snap, + bs: bs, + l: logger.GetLogger("test-queryResult"), + parts: parts, + asc: true, + released: false, + } + + response := qr.Pull() + + // With cancelled context, we should handle gracefully + // The behavior depends on implementation - it might return nil or an error response + if response != nil { + t.Logf("Response with cancelled context: error=%v, len=%d", response.Error, response.Len()) + } else { + t.Log("Got nil response with cancelled context - this is acceptable") + } + + qr.Release() +}