hanahmily commented on code in PR #674: URL: https://github.com/apache/skywalking-banyandb/pull/674#discussion_r2111921575
########## banyand/stream/block_scanner_test.go: ########## @@ -0,0 +1,201 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package stream + +import ( + "context" + "sort" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/stretchr/testify/require" + + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/pkg/cgroups" + "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/run" + "github.com/apache/skywalking-banyandb/pkg/test" + "github.com/apache/skywalking-banyandb/pkg/watcher" +) + +func TestBlockScanner_QuotaExceeded(t *testing.T) { + type testCtx struct { + name string + esList []*elements + sids []common.SeriesID + want []blockMetadata + minTimestamp int64 + maxTimestamp int64 + expectQuotaExceeded bool + asc bool + } + + verify := func(t *testing.T, tt testCtx, tst *tsTable) { Review Comment: Same as the `query_by_idx_test.go` ########## banyand/measure/query_test.go: ########## @@ -1358,3 +1358,212 @@ func TestQueryResult(t *testing.T) { }) } } + +type fakeMemory struct { + acquireErr error + limit uint64 + expectQuotaExceeded bool +} + +func (f *fakeMemory) AvailableBytes() int64 { + if f.expectQuotaExceeded { + return 10 + } + return 10000 +} + +func (f *fakeMemory) GetLimit() uint64 { + return f.limit +} + +func (f *fakeMemory) AcquireResource(_ context.Context, _ uint64) error { + return f.acquireErr +} + +func (f *fakeMemory) Name() string { + return "fake-memory" +} + +func (f *fakeMemory) FlagSet() *run.FlagSet { + return run.NewFlagSet("fake-memory") +} + +func (f *fakeMemory) Validate() error { + return nil +} + +func (f *fakeMemory) PreRun(_ context.Context) error { + return nil +} + +func (f *fakeMemory) GracefulStop() { + // no-op for test +} + +func (f *fakeMemory) Serve() run.StopNotify { + ch := make(chan struct{}) + close(ch) + return ch +} + +func TestQueryResult_QuotaExceeded(t *testing.T) { + tests := []struct { + wantErr error + name string + dpsList []*dataPoints + sids []common.SeriesID + want []model.MeasureResult + minTimestamp int64 + maxTimestamp int64 + orderBySeries bool + ascTS bool + expectQuotaExceeded bool + }{ + { + name: "TestQuotaNotExceeded_ExpectSuccess", + dpsList: []*dataPoints{dpsTS1, dpsTS1}, + sids: []common.SeriesID{1}, + minTimestamp: 1, + maxTimestamp: 1, + expectQuotaExceeded: false, + want: []model.MeasureResult{{ + SID: 1, + Timestamps: []int64{1}, + Versions: []int64{1}, + TagFamilies: []model.TagFamily{ + {Name: "arrTag", Tags: []model.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"})}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{int64ArrTagValue([]int64{25, 30})}}, + }}, + {Name: "binaryTag", Tags: []model.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{binaryDataTagValue(longText)}}, + }}, + {Name: "singleTag", Tags: []model.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{strTagValue("value1")}}, + {Name: "intTag", Values: []*modelv1.TagValue{int64TagValue(10)}}, + {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + }, + Fields: []model.Field{ + {Name: "strField", Values: []*modelv1.FieldValue{strFieldValue("field1")}}, + {Name: "intField", Values: []*modelv1.FieldValue{int64FieldValue(1110)}}, + {Name: "floatField", Values: []*modelv1.FieldValue{float64FieldValue(1.221233343e+06)}}, + {Name: "binaryField", Values: []*modelv1.FieldValue{binaryDataFieldValue(longText)}}, + }, + }}, + }, + { + name: "TestQuotaExceeded_ExpectError", + dpsList: []*dataPoints{dpsTS1, dpsTS1}, + sids: []common.SeriesID{1}, + minTimestamp: 1, + maxTimestamp: 1, + expectQuotaExceeded: true, + want: []model.MeasureResult{{ + SID: 1, + Timestamps: []int64{1}, + Versions: []int64{1}, + TagFamilies: []model.TagFamily{ + {Name: "arrTag", Tags: []model.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"})}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{int64ArrTagValue([]int64{25, 30})}}, + }}, + {Name: "binaryTag", Tags: []model.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{binaryDataTagValue(longText)}}, + }}, + {Name: "singleTag", Tags: []model.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{strTagValue("value1")}}, + {Name: "intTag", Values: []*modelv1.TagValue{int64TagValue(10)}}, + {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + }, + Fields: []model.Field{ + {Name: "strField", Values: []*modelv1.FieldValue{strFieldValue("field1")}}, + {Name: "intField", Values: []*modelv1.FieldValue{int64FieldValue(1110)}}, + {Name: "floatField", Values: []*modelv1.FieldValue{float64FieldValue(1.221233343e+06)}}, + {Name: "binaryField", Values: []*modelv1.FieldValue{binaryDataFieldValue(longText)}}, + }, + }}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + verify := func(t *testing.T, tst *tsTable) { Review Comment: Remove this function and integrate its logic into the `t.Run(tt.name, func(t *testing.T) {` section. ########## banyand/measure/query_test.go: ########## @@ -1358,3 +1358,212 @@ func TestQueryResult(t *testing.T) { }) } } + +type fakeMemory struct { + acquireErr error + limit uint64 + expectQuotaExceeded bool +} + +func (f *fakeMemory) AvailableBytes() int64 { + if f.expectQuotaExceeded { + return 10 + } + return 10000 +} + +func (f *fakeMemory) GetLimit() uint64 { + return f.limit +} + +func (f *fakeMemory) AcquireResource(_ context.Context, _ uint64) error { + return f.acquireErr +} + +func (f *fakeMemory) Name() string { + return "fake-memory" +} + +func (f *fakeMemory) FlagSet() *run.FlagSet { + return run.NewFlagSet("fake-memory") +} + +func (f *fakeMemory) Validate() error { + return nil +} + +func (f *fakeMemory) PreRun(_ context.Context) error { + return nil +} + +func (f *fakeMemory) GracefulStop() { + // no-op for test +} + +func (f *fakeMemory) Serve() run.StopNotify { + ch := make(chan struct{}) + close(ch) + return ch +} + +func TestQueryResult_QuotaExceeded(t *testing.T) { + tests := []struct { + wantErr error + name string + dpsList []*dataPoints + sids []common.SeriesID + want []model.MeasureResult + minTimestamp int64 + maxTimestamp int64 + orderBySeries bool + ascTS bool + expectQuotaExceeded bool + }{ + { + name: "TestQuotaNotExceeded_ExpectSuccess", + dpsList: []*dataPoints{dpsTS1, dpsTS1}, + sids: []common.SeriesID{1}, + minTimestamp: 1, + maxTimestamp: 1, + expectQuotaExceeded: false, + want: []model.MeasureResult{{ + SID: 1, + Timestamps: []int64{1}, + Versions: []int64{1}, + TagFamilies: []model.TagFamily{ + {Name: "arrTag", Tags: []model.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"})}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{int64ArrTagValue([]int64{25, 30})}}, + }}, + {Name: "binaryTag", Tags: []model.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{binaryDataTagValue(longText)}}, + }}, + {Name: "singleTag", Tags: []model.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{strTagValue("value1")}}, + {Name: "intTag", Values: []*modelv1.TagValue{int64TagValue(10)}}, + {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + }, + Fields: []model.Field{ + {Name: "strField", Values: []*modelv1.FieldValue{strFieldValue("field1")}}, + {Name: "intField", Values: []*modelv1.FieldValue{int64FieldValue(1110)}}, + {Name: "floatField", Values: []*modelv1.FieldValue{float64FieldValue(1.221233343e+06)}}, + {Name: "binaryField", Values: []*modelv1.FieldValue{binaryDataFieldValue(longText)}}, + }, + }}, + }, + { + name: "TestQuotaExceeded_ExpectError", + dpsList: []*dataPoints{dpsTS1, dpsTS1}, + sids: []common.SeriesID{1}, + minTimestamp: 1, + maxTimestamp: 1, + expectQuotaExceeded: true, + want: []model.MeasureResult{{ + SID: 1, + Timestamps: []int64{1}, + Versions: []int64{1}, + TagFamilies: []model.TagFamily{ + {Name: "arrTag", Tags: []model.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"})}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{int64ArrTagValue([]int64{25, 30})}}, + }}, + {Name: "binaryTag", Tags: []model.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{binaryDataTagValue(longText)}}, + }}, + {Name: "singleTag", Tags: []model.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{strTagValue("value1")}}, + {Name: "intTag", Values: []*modelv1.TagValue{int64TagValue(10)}}, + {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + }, + Fields: []model.Field{ + {Name: "strField", Values: []*modelv1.FieldValue{strFieldValue("field1")}}, + {Name: "intField", Values: []*modelv1.FieldValue{int64FieldValue(1110)}}, + {Name: "floatField", Values: []*modelv1.FieldValue{float64FieldValue(1.221233343e+06)}}, + {Name: "binaryField", Values: []*modelv1.FieldValue{binaryDataFieldValue(longText)}}, + }, + }}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + verify := func(t *testing.T, tst *tsTable) { + m := &measure{ + pm: &fakeMemory{expectQuotaExceeded: tt.expectQuotaExceeded}, + } + defer tst.Close() + queryOpts := queryOptions{ + minTimestamp: tt.minTimestamp, + maxTimestamp: tt.maxTimestamp, + } + queryOpts.TagProjection = tagProjections[1] + queryOpts.FieldProjection = fieldProjections[1] + s := tst.currentSnapshot() + require.NotNil(t, s) + defer s.decRef() + pp, _ := s.getParts(nil, queryOpts.minTimestamp, queryOpts.maxTimestamp) + var result queryResult + result.ctx = context.TODO() + // Query all tags + result.tagProjection = allTagProjections + err := m.searchBlocks(context.TODO(), &result, tt.sids, pp, queryOpts) + if tt.expectQuotaExceeded { + require.Error(t, err) + require.Contains(t, err.Error(), "quota exceeded", "expected quota to be exceeded but got: %v", err) + return + } + require.NoError(t, err) + defer result.Release() + if tt.orderBySeries { + result.sidToIndex = make(map[common.SeriesID]int) + for i, si := range tt.sids { + result.sidToIndex[si] = i + } + } else { + result.orderByTS = true + result.ascTS = tt.ascTS + } + var got []model.MeasureResult + for { + r := result.Pull() + if r == nil { + break + } + sort.Slice(r.TagFamilies, func(i, j int) bool { + return r.TagFamilies[i].Name < r.TagFamilies[j].Name + }) + got = append(got, *r) + } + if diff := cmp.Diff(got, tt.want, + protocmp.IgnoreUnknown(), protocmp.Transform()); diff != "" { + t.Errorf("Unexpected []pbv1.Result (-got +want):\n%s", diff) + } + } + + t.Run("memory snapshot", func(t *testing.T) { Review Comment: Remove this sub-test case and integrate its logic into the `t.Run(tt.name, func(t *testing.T) {` section. ########## banyand/stream/query_by_idx_test.go: ########## @@ -0,0 +1,180 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package stream + +import ( + "context" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/stretchr/testify/require" + + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/run" + "github.com/apache/skywalking-banyandb/pkg/test" + "github.com/apache/skywalking-banyandb/pkg/watcher" +) + +type fakeMemory struct { + acquireErr error + limit uint64 + expectQuotaExceeded bool +} + +func (f *fakeMemory) AvailableBytes() int64 { + if f.expectQuotaExceeded { + return 10 + } + return 10000 +} + +func (f *fakeMemory) GetLimit() uint64 { + return f.limit +} + +func (f *fakeMemory) AcquireResource(_ context.Context, _ uint64) error { + return f.acquireErr +} + +func (f *fakeMemory) Name() string { + return "fake-memory" +} + +func (f *fakeMemory) FlagSet() *run.FlagSet { + return run.NewFlagSet("fake-memory") +} + +func (f *fakeMemory) Validate() error { + return nil +} + +func (f *fakeMemory) PreRun(_ context.Context) error { + return nil +} + +func (f *fakeMemory) GracefulStop() { + // no-op for test +} + +func (f *fakeMemory) Serve() run.StopNotify { + ch := make(chan struct{}) + close(ch) + return ch +} + +func TestQueryResult_QuotaExceeded(t *testing.T) { + type testCtx struct { + name string + esList []*elements + sids []common.SeriesID + want []blockMetadata + minTimestamp int64 + maxTimestamp int64 + expectQuotaExceeded bool + } + + verify := func(t *testing.T, tt testCtx, tst *tsTable) { + defer tst.Close() + qr := &idxResult{ + pm: &fakeMemory{expectQuotaExceeded: tt.expectQuotaExceeded}, + tabs: []*tsTable{tst}, + } + qo := queryOptions{ + minTimestamp: tt.minTimestamp, + maxTimestamp: tt.maxTimestamp, + sortedSids: tt.sids, + } + err := qr.scanParts(context.TODO(), qo) + if tt.expectQuotaExceeded { + require.Error(t, err) + require.Contains(t, err.Error(), "quota exceeded", "expected quota to be exceeded but got: %v", err) + return + } + require.NoError(t, err) + var got []blockMetadata + for _, data := range qr.data { + got = append(got, data.bm) + } + if diff := cmp.Diff(got, tt.want, + cmpopts.IgnoreFields(blockMetadata{}, "timestamps"), + cmpopts.IgnoreFields(blockMetadata{}, "elementIDs"), + cmpopts.IgnoreFields(blockMetadata{}, "tagFamilies"), + cmp.AllowUnexported(blockMetadata{}), + ); diff != "" { + t.Errorf("Unexpected blockMetadata (-got +want):\n%s", diff) + } + } + + t.Run("memory snapshot", func(t *testing.T) { + tests := []testCtx{ + { + name: "TestQuotaNotExceeded_ExpectSuccess", + esList: []*elements{esTS1}, + sids: []common.SeriesID{1, 2, 3}, + minTimestamp: 1, + maxTimestamp: 1, + expectQuotaExceeded: false, + want: []blockMetadata{ + {seriesID: 1, count: 1, uncompressedSizeBytes: 889}, + {seriesID: 2, count: 1, uncompressedSizeBytes: 63}, + {seriesID: 3, count: 1, uncompressedSizeBytes: 16}, + }, + }, + { + name: "TestQuotaExceeded_ExpectError", + esList: []*elements{esTS1}, + sids: []common.SeriesID{1, 2, 3}, + minTimestamp: 1, + maxTimestamp: 1, + expectQuotaExceeded: true, + want: []blockMetadata{ + {seriesID: 1, count: 1, uncompressedSizeBytes: 889}, + {seriesID: 2, count: 1, uncompressedSizeBytes: 63}, + {seriesID: 3, count: 1, uncompressedSizeBytes: 16}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tmpPath, defFn := test.Space(require.New(t)) + index, _ := newElementIndex(context.TODO(), tmpPath, 0, nil) + defer defFn() + tst := &tsTable{ + index: index, + loopCloser: run.NewCloser(2), + introductions: make(chan *introduction), + fileSystem: fs.NewLocalFileSystem(), + root: tmpPath, + } + tst.gc.init(tst) + flushCh := make(chan *flusherIntroduction) + mergeCh := make(chan *mergerIntroduction) + introducerWatcher := make(watcher.Channel, 1) + go tst.introducerLoop(flushCh, mergeCh, introducerWatcher, 1) Review Comment: You can refer to "measure.query_test.go" to use `newTSTable` to set up a tsTable, which starts these loops. ########## banyand/stream/query_by_idx_test.go: ########## @@ -0,0 +1,180 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package stream + +import ( + "context" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/stretchr/testify/require" + + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/run" + "github.com/apache/skywalking-banyandb/pkg/test" + "github.com/apache/skywalking-banyandb/pkg/watcher" +) + +type fakeMemory struct { Review Comment: ```suggestion type mockMemoryProtector struct { ``` ########## banyand/stream/block_scanner_test.go: ########## @@ -0,0 +1,201 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package stream + +import ( + "context" + "sort" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/stretchr/testify/require" + + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/pkg/cgroups" + "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/run" + "github.com/apache/skywalking-banyandb/pkg/test" + "github.com/apache/skywalking-banyandb/pkg/watcher" +) + +func TestBlockScanner_QuotaExceeded(t *testing.T) { + type testCtx struct { + name string + esList []*elements + sids []common.SeriesID + want []blockMetadata + minTimestamp int64 + maxTimestamp int64 + expectQuotaExceeded bool + asc bool + } + + verify := func(t *testing.T, tt testCtx, tst *tsTable) { + defer tst.Close() + workerSize := cgroups.CPUs() + var workerWg sync.WaitGroup + batchCh := make(chan *blockScanResultBatch, workerSize) + workerWg.Add(workerSize) + + qo := queryOptions{ + minTimestamp: tt.minTimestamp, + maxTimestamp: tt.maxTimestamp, + sortedSids: tt.sids, + } + var parts []*part + s := tst.currentSnapshot() + require.NotNil(t, s) + parts, _ = s.getParts(parts, qo.minTimestamp, qo.maxTimestamp) + bsn := &blockScanner{ + parts: getDisjointParts(parts, tt.asc), + qo: qo, + asc: tt.asc, + pm: &fakeMemory{expectQuotaExceeded: tt.expectQuotaExceeded}, + newBatchFunc: func() *blockScanResultBatch { + return &blockScanResultBatch{ + bss: make([]blockScanResult, 0, 1), + } + }, + } + + var ( + got []blockMetadata + mu sync.Mutex + errSeen atomic.Bool + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + for i := 0; i < workerSize; i++ { + go func() { + defer workerWg.Done() + for { + select { + case <-ctx.Done(): + return + case batch, ok := <-batchCh: + if !ok { + return + } + if batch.err != nil { + if tt.expectQuotaExceeded { + require.Error(t, batch.err) + require.Contains(t, batch.err.Error(), "quota exceeded") + errSeen.Store(true) + releaseBlockScanResultBatch(batch) + cancel() // stop all worker + return + } + require.NoError(t, batch.err) + releaseBlockScanResultBatch(batch) + continue + } + mu.Lock() + for _, bs := range batch.bss { + got = append(got, bs.bm) + } + mu.Unlock() + releaseBlockScanResultBatch(batch) + } + } + }() + } + bsn.scan(ctx, batchCh) + close(batchCh) + workerWg.Wait() + + if tt.expectQuotaExceeded { + if !errSeen.Load() { + t.Errorf("Expected quota exceeded error, but none occurred") + } + return + } + + sort.Slice(got, func(i, j int) bool { + return got[i].seriesID < got[j].seriesID + }) + + if diff := cmp.Diff(got, tt.want, + cmpopts.IgnoreFields(blockMetadata{}, "timestamps"), + cmpopts.IgnoreFields(blockMetadata{}, "elementIDs"), + cmpopts.IgnoreFields(blockMetadata{}, "tagFamilies"), + cmp.AllowUnexported(blockMetadata{}), + ); diff != "" { + t.Errorf("Unexpected blockMetadata (-got +want):\n%s", diff) + } + } + + t.Run("memory snapshot", func(t *testing.T) { Review Comment: Same as the `query_by_idx_test.go` ########## banyand/stream/block_scanner_test.go: ########## @@ -0,0 +1,201 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package stream + +import ( + "context" + "sort" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/stretchr/testify/require" + + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/pkg/cgroups" + "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/run" + "github.com/apache/skywalking-banyandb/pkg/test" + "github.com/apache/skywalking-banyandb/pkg/watcher" +) + +func TestBlockScanner_QuotaExceeded(t *testing.T) { + type testCtx struct { + name string + esList []*elements + sids []common.SeriesID + want []blockMetadata + minTimestamp int64 + maxTimestamp int64 + expectQuotaExceeded bool + asc bool + } + + verify := func(t *testing.T, tt testCtx, tst *tsTable) { + defer tst.Close() + workerSize := cgroups.CPUs() + var workerWg sync.WaitGroup + batchCh := make(chan *blockScanResultBatch, workerSize) + workerWg.Add(workerSize) + + qo := queryOptions{ + minTimestamp: tt.minTimestamp, + maxTimestamp: tt.maxTimestamp, + sortedSids: tt.sids, + } + var parts []*part + s := tst.currentSnapshot() + require.NotNil(t, s) + parts, _ = s.getParts(parts, qo.minTimestamp, qo.maxTimestamp) + bsn := &blockScanner{ + parts: getDisjointParts(parts, tt.asc), + qo: qo, + asc: tt.asc, + pm: &fakeMemory{expectQuotaExceeded: tt.expectQuotaExceeded}, + newBatchFunc: func() *blockScanResultBatch { + return &blockScanResultBatch{ + bss: make([]blockScanResult, 0, 1), + } + }, + } + + var ( + got []blockMetadata + mu sync.Mutex + errSeen atomic.Bool + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + for i := 0; i < workerSize; i++ { + go func() { + defer workerWg.Done() + for { + select { + case <-ctx.Done(): + return + case batch, ok := <-batchCh: + if !ok { + return + } + if batch.err != nil { + if tt.expectQuotaExceeded { + require.Error(t, batch.err) + require.Contains(t, batch.err.Error(), "quota exceeded") + errSeen.Store(true) + releaseBlockScanResultBatch(batch) + cancel() // stop all worker + return + } + require.NoError(t, batch.err) + releaseBlockScanResultBatch(batch) + continue + } + mu.Lock() + for _, bs := range batch.bss { + got = append(got, bs.bm) + } + mu.Unlock() + releaseBlockScanResultBatch(batch) + } + } + }() + } + bsn.scan(ctx, batchCh) + close(batchCh) + workerWg.Wait() + + if tt.expectQuotaExceeded { + if !errSeen.Load() { + t.Errorf("Expected quota exceeded error, but none occurred") + } + return + } + + sort.Slice(got, func(i, j int) bool { + return got[i].seriesID < got[j].seriesID + }) + + if diff := cmp.Diff(got, tt.want, + cmpopts.IgnoreFields(blockMetadata{}, "timestamps"), + cmpopts.IgnoreFields(blockMetadata{}, "elementIDs"), + cmpopts.IgnoreFields(blockMetadata{}, "tagFamilies"), + cmp.AllowUnexported(blockMetadata{}), + ); diff != "" { + t.Errorf("Unexpected blockMetadata (-got +want):\n%s", diff) + } + } + + t.Run("memory snapshot", func(t *testing.T) { + tests := []testCtx{ + { + name: "TestQuotaNotExceeded_ExpectSuccess", + esList: []*elements{esTS1}, + sids: []common.SeriesID{1, 2, 3}, + minTimestamp: 1, + maxTimestamp: 1, + expectQuotaExceeded: false, + want: []blockMetadata{ + {seriesID: 1, count: 1, uncompressedSizeBytes: 889}, + {seriesID: 2, count: 1, uncompressedSizeBytes: 63}, + {seriesID: 3, count: 1, uncompressedSizeBytes: 16}, + }, + }, + { + name: "TestQuotaExceeded_ExpectError", + esList: []*elements{esTS1}, + sids: []common.SeriesID{1, 2, 3}, + minTimestamp: 1, + maxTimestamp: 1, + expectQuotaExceeded: true, + want: []blockMetadata{ + {seriesID: 1, count: 1, uncompressedSizeBytes: 889}, + {seriesID: 2, count: 1, uncompressedSizeBytes: 63}, + {seriesID: 3, count: 1, uncompressedSizeBytes: 16}, + }, Review Comment: ```suggestion ``` ########## banyand/stream/block_scanner_test.go: ########## @@ -0,0 +1,201 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package stream + +import ( + "context" + "sort" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/stretchr/testify/require" + + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/pkg/cgroups" + "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/run" + "github.com/apache/skywalking-banyandb/pkg/test" + "github.com/apache/skywalking-banyandb/pkg/watcher" +) + +func TestBlockScanner_QuotaExceeded(t *testing.T) { + type testCtx struct { + name string + esList []*elements + sids []common.SeriesID + want []blockMetadata + minTimestamp int64 + maxTimestamp int64 + expectQuotaExceeded bool + asc bool + } + + verify := func(t *testing.T, tt testCtx, tst *tsTable) { + defer tst.Close() + workerSize := cgroups.CPUs() + var workerWg sync.WaitGroup + batchCh := make(chan *blockScanResultBatch, workerSize) + workerWg.Add(workerSize) + + qo := queryOptions{ + minTimestamp: tt.minTimestamp, + maxTimestamp: tt.maxTimestamp, + sortedSids: tt.sids, + } + var parts []*part + s := tst.currentSnapshot() + require.NotNil(t, s) + parts, _ = s.getParts(parts, qo.minTimestamp, qo.maxTimestamp) + bsn := &blockScanner{ + parts: getDisjointParts(parts, tt.asc), + qo: qo, + asc: tt.asc, + pm: &fakeMemory{expectQuotaExceeded: tt.expectQuotaExceeded}, + newBatchFunc: func() *blockScanResultBatch { + return &blockScanResultBatch{ + bss: make([]blockScanResult, 0, 1), Review Comment: Why do you drop the `generateBlockScanResultBatch`? ########## banyand/stream/query_by_idx_test.go: ########## @@ -0,0 +1,180 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package stream + +import ( + "context" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/stretchr/testify/require" + + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/run" + "github.com/apache/skywalking-banyandb/pkg/test" + "github.com/apache/skywalking-banyandb/pkg/watcher" +) + +type fakeMemory struct { + acquireErr error + limit uint64 + expectQuotaExceeded bool +} + +func (f *fakeMemory) AvailableBytes() int64 { + if f.expectQuotaExceeded { + return 10 + } + return 10000 +} + +func (f *fakeMemory) GetLimit() uint64 { + return f.limit +} + +func (f *fakeMemory) AcquireResource(_ context.Context, _ uint64) error { + return f.acquireErr +} + +func (f *fakeMemory) Name() string { + return "fake-memory" +} + +func (f *fakeMemory) FlagSet() *run.FlagSet { + return run.NewFlagSet("fake-memory") +} + +func (f *fakeMemory) Validate() error { + return nil +} + +func (f *fakeMemory) PreRun(_ context.Context) error { + return nil +} + +func (f *fakeMemory) GracefulStop() { + // no-op for test +} + +func (f *fakeMemory) Serve() run.StopNotify { + ch := make(chan struct{}) + close(ch) + return ch +} + +func TestQueryResult_QuotaExceeded(t *testing.T) { + type testCtx struct { + name string + esList []*elements + sids []common.SeriesID + want []blockMetadata + minTimestamp int64 + maxTimestamp int64 + expectQuotaExceeded bool + } + + verify := func(t *testing.T, tt testCtx, tst *tsTable) { Review Comment: Remove this function, then move its logic into `t.Run(tt.name, func(t *testing.T) {` ########## banyand/measure/query_test.go: ########## @@ -1358,3 +1358,212 @@ func TestQueryResult(t *testing.T) { }) } } + +type fakeMemory struct { + acquireErr error + limit uint64 + expectQuotaExceeded bool +} + +func (f *fakeMemory) AvailableBytes() int64 { + if f.expectQuotaExceeded { + return 10 + } + return 10000 +} + +func (f *fakeMemory) GetLimit() uint64 { + return f.limit +} + +func (f *fakeMemory) AcquireResource(_ context.Context, _ uint64) error { + return f.acquireErr +} + +func (f *fakeMemory) Name() string { + return "fake-memory" +} + +func (f *fakeMemory) FlagSet() *run.FlagSet { + return run.NewFlagSet("fake-memory") +} + +func (f *fakeMemory) Validate() error { + return nil +} + +func (f *fakeMemory) PreRun(_ context.Context) error { + return nil +} + +func (f *fakeMemory) GracefulStop() { + // no-op for test +} + +func (f *fakeMemory) Serve() run.StopNotify { + ch := make(chan struct{}) + close(ch) + return ch +} + +func TestQueryResult_QuotaExceeded(t *testing.T) { + tests := []struct { + wantErr error + name string + dpsList []*dataPoints + sids []common.SeriesID + want []model.MeasureResult + minTimestamp int64 + maxTimestamp int64 + orderBySeries bool + ascTS bool + expectQuotaExceeded bool + }{ + { + name: "TestQuotaNotExceeded_ExpectSuccess", + dpsList: []*dataPoints{dpsTS1, dpsTS1}, + sids: []common.SeriesID{1}, + minTimestamp: 1, + maxTimestamp: 1, + expectQuotaExceeded: false, + want: []model.MeasureResult{{ + SID: 1, + Timestamps: []int64{1}, + Versions: []int64{1}, + TagFamilies: []model.TagFamily{ + {Name: "arrTag", Tags: []model.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"})}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{int64ArrTagValue([]int64{25, 30})}}, + }}, + {Name: "binaryTag", Tags: []model.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{binaryDataTagValue(longText)}}, + }}, + {Name: "singleTag", Tags: []model.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{strTagValue("value1")}}, + {Name: "intTag", Values: []*modelv1.TagValue{int64TagValue(10)}}, + {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + }, + Fields: []model.Field{ + {Name: "strField", Values: []*modelv1.FieldValue{strFieldValue("field1")}}, + {Name: "intField", Values: []*modelv1.FieldValue{int64FieldValue(1110)}}, + {Name: "floatField", Values: []*modelv1.FieldValue{float64FieldValue(1.221233343e+06)}}, + {Name: "binaryField", Values: []*modelv1.FieldValue{binaryDataFieldValue(longText)}}, + }, + }}, + }, + { + name: "TestQuotaExceeded_ExpectError", + dpsList: []*dataPoints{dpsTS1, dpsTS1}, + sids: []common.SeriesID{1}, + minTimestamp: 1, + maxTimestamp: 1, + expectQuotaExceeded: true, + want: []model.MeasureResult{{ + SID: 1, + Timestamps: []int64{1}, + Versions: []int64{1}, + TagFamilies: []model.TagFamily{ + {Name: "arrTag", Tags: []model.Tag{ + {Name: "strArrTag", Values: []*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"})}}, + {Name: "intArrTag", Values: []*modelv1.TagValue{int64ArrTagValue([]int64{25, 30})}}, + }}, + {Name: "binaryTag", Tags: []model.Tag{ + {Name: "binaryTag", Values: []*modelv1.TagValue{binaryDataTagValue(longText)}}, + }}, + {Name: "singleTag", Tags: []model.Tag{ + {Name: "strTag", Values: []*modelv1.TagValue{strTagValue("value1")}}, + {Name: "intTag", Values: []*modelv1.TagValue{int64TagValue(10)}}, + {Name: "strTag1", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "strTag2", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + }, + Fields: []model.Field{ + {Name: "strField", Values: []*modelv1.FieldValue{strFieldValue("field1")}}, + {Name: "intField", Values: []*modelv1.FieldValue{int64FieldValue(1110)}}, + {Name: "floatField", Values: []*modelv1.FieldValue{float64FieldValue(1.221233343e+06)}}, + {Name: "binaryField", Values: []*modelv1.FieldValue{binaryDataFieldValue(longText)}}, + }, + }}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + verify := func(t *testing.T, tst *tsTable) { + m := &measure{ + pm: &fakeMemory{expectQuotaExceeded: tt.expectQuotaExceeded}, + } + defer tst.Close() + queryOpts := queryOptions{ + minTimestamp: tt.minTimestamp, + maxTimestamp: tt.maxTimestamp, + } + queryOpts.TagProjection = tagProjections[1] + queryOpts.FieldProjection = fieldProjections[1] + s := tst.currentSnapshot() + require.NotNil(t, s) + defer s.decRef() + pp, _ := s.getParts(nil, queryOpts.minTimestamp, queryOpts.maxTimestamp) + var result queryResult + result.ctx = context.TODO() + // Query all tags + result.tagProjection = allTagProjections + err := m.searchBlocks(context.TODO(), &result, tt.sids, pp, queryOpts) + if tt.expectQuotaExceeded { + require.Error(t, err) + require.Contains(t, err.Error(), "quota exceeded", "expected quota to be exceeded but got: %v", err) + return + } + require.NoError(t, err) + defer result.Release() + if tt.orderBySeries { + result.sidToIndex = make(map[common.SeriesID]int) + for i, si := range tt.sids { + result.sidToIndex[si] = i + } + } else { + result.orderByTS = true + result.ascTS = tt.ascTS + } + var got []model.MeasureResult + for { + r := result.Pull() + if r == nil { + break + } + sort.Slice(r.TagFamilies, func(i, j int) bool { + return r.TagFamilies[i].Name < r.TagFamilies[j].Name + }) + got = append(got, *r) + } + if diff := cmp.Diff(got, tt.want, + protocmp.IgnoreUnknown(), protocmp.Transform()); diff != "" { + t.Errorf("Unexpected []pbv1.Result (-got +want):\n%s", diff) + } + } + + t.Run("memory snapshot", func(t *testing.T) { + tmpPath, defFn := test.Space(require.New(t)) + defer defFn() + tst := &tsTable{ + loopCloser: run.NewCloser(2), + introductions: make(chan *introduction), + fileSystem: fs.NewLocalFileSystem(), + root: tmpPath, + } + tst.gc.init(tst) + flushCh := make(chan *flusherIntroduction) + mergeCh := make(chan *mergerIntroduction) + introducerWatcher := make(watcher.Channel, 1) + go tst.introducerLoop(flushCh, mergeCh, introducerWatcher, 1) Review Comment: Please use `newTSTable` to initialize the tsTable ########## banyand/stream/query_by_idx_test.go: ########## @@ -0,0 +1,180 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package stream + +import ( + "context" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/stretchr/testify/require" + + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/run" + "github.com/apache/skywalking-banyandb/pkg/test" + "github.com/apache/skywalking-banyandb/pkg/watcher" +) + +type fakeMemory struct { Review Comment: Please move the mock to `banyand/internal/test`, so that both `stream` and `measure` can share it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@skywalking.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org