This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch v0.10.x in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 18477053b7aa445e2432ea347998e584305d0b0f Author: Gao Hongtao <[email protected]> AuthorDate: Wed Apr 1 20:07:18 2026 +0800 fix(test): merge query_ondisk into query and fix flaky on-disk integration tests (#1041) * Merge query_ondisk into query with two-round testing pattern - Extract test entries and RegisterTable from all case files - Add Restart function to common.go SetupResult structs - Convert suites to use ClosableStandalone/EmptyClosableStandalone pattern - Create round2.go for on-disk testing with fresh gRPC connections - Delete entire query_ondisk directory as it's now merged Co-Authored-By: Claude Opus 4.6 <[email protected]> --- CHANGES.md | 65 ++++++++++--- test/cases/measure/measure.go | 11 ++- test/cases/property/property.go | 11 ++- test/cases/stream/stream.go | 19 ++-- test/cases/topn/topn.go | 5 - test/cases/trace/trace.go | 19 ++-- .../standalone/multi_segments/common.go | 4 + .../standalone/multi_segments/etcd/suite_test.go | 24 ++++- .../multi_segments/property/suite_test.go | 21 ++++- .../standalone/multi_segments/round2.go | 61 ++++++++++++ test/integration/standalone/query/common.go | 4 + .../standalone/query/etcd/suite_test.go | 28 +++++- .../standalone/query/property/suite_test.go | 21 ++++- test/integration/standalone/query/round2.go | 64 +++++++++++++ test/integration/standalone/query_ondisk/common.go | 103 --------------------- .../standalone/query_ondisk/etcd/suite_test.go | 64 ------------- .../standalone/query_ondisk/property/suite_test.go | 69 -------------- test_table | Bin 0 -> 2254768 bytes 18 files changed, 313 insertions(+), 280 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index ace8b3034..ecbffb7ee 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,25 +2,64 @@ Release Notes. -<<<<<<< HEAD ## 0.10.2 -======= -## 0.11.0 + +### Bug Fixes + +- Fix reuse of byte arrays in min/max implementation causing data corruption. +- Fix index-mode measure queries returning documents outside requested time range. +- Fix flaky trace query filtering caused by non-deterministic sidx tag ordering and add consistency checks for integration query cases. +- MCP: Add validation for properties and harden the mcp server. +- Fix property schema client connection not stable after data node restarted. +- Fix flaky on-disk integration tests caused by Ginkgo v2 random container shuffling closing gRPC connections prematurely. + +## 0.10.0 ### Features -- Organize access logs under a dedicated "accesslog" subdirectory to improve log organization and separation from other application data. -- Collect BanyanDB data on e2e test failure for CI debugging. -- Add log query e2e test. -- Sync lifecycle e2e test from SkyWalking stages test. -- Add periodic health check for property schema connection. -- Persist segment end time in per-segment metadata so boundaries don't shift across restarts or config changes. ->>>>>>> 29e87c9d (Stable Segment Endtime (#1051)) +- Remove Bloom filter for dictionary-encoded tags. +- Implement BanyanDB MCP. +- Support deleting non-entity tags when updating the schema. +- Remove check requiring tags in criteria to be present in projection. +- Add sorted query support for the Property. +- Update bydbQL to add sorted query support for the Property. +- Remove the windows arch for binary and docker image. +- Support writing data with specifications. +- Persist series metadata in liaison queue for measure, stream and trace models. +- Update the dump tool to support analyzing the parts with smeta files. +- Add replication integration test for measure. +- Activate the property repair mechanism by default. +- Add snapshot time retention policy to ensure the snapshot only can be deleted after the configured minimum age(time). +- **Breaking Change**: Change the data storage path structure for property model: + - From: `<data-dir>/property/data/shard-<id>/...` + - To: `<data-dir>/property/data/<group>/shard-<id>/...` +- Add a generic snapshot coordination package for atomic snapshot transitions across trace and sidx. +- Support map-reduce aggregation for measure queries: map phase (partial aggregation on data nodes) and reduce phase (final aggregation on liaison). +- Add eBPF-based KTM I/O monitor for FODC agent. +- Support relative paths in configuration. +- Support 'none' node discovery and make it the default. +- Support server-side element ID generation for stream writes when clients omit element_id. +- Implement entire group deletion. ### Bug Fixes -- Fix reuse of byte arrays in min/max implementation causing data corruption. -- Fix index-mode measure queries returning documents outside requested time range. +- Fix the wrong retention setting of each measure/stream/trace. +- Fix server got panic when create/update property with high dist usage. +- Fix incorrect key range update in sidx part metadata. +- Fix panic in measure block merger when merging blocks with overlapping timestamps. +- Fix unsupported empty string tag bug. +- Fix duplicate elements in stream query results by implementing element ID-based deduplication across scan, merge, and result building stages. +- Fix data written to the wrong shard and related stream queries. +- Fix the lifecycle panic when the trace has no sidx. +- Fix panic in sidx merge and flush operations when part counts don't match expectations. +- Fix trace queries with range conditions on the same tag (e.g., duration) combined with ORDER BY by deduplicating tag names when merging logical expression branches. +- Fix sidx tag filter range check returning inverted skip decision and use correct int64 encoding for block min/max. +- Ignore take snapshot when no data. +- Fix measure standalone write handler resetting accumulated groups on error, which dropped all successfully processed events in the batch. +- Fix memory part reference leak in mustAddMemPart when tsTable loop closes. +- Fix memory part leak in syncPartContext Close and prevent double-release in FinishSync. +- Fix segment reference leaks in measure/stream/trace queries and ensure chunked sync sessions close part contexts correctly. +- Fix duplicate query execution in distributed measure Agg+TopN queries by enabling push-down aggregation, removing the wasteful double-query pattern. - Fix nil pointer panic in segment collectMetrics during shutdown. - Fix property schema client connection instability after data node restart. - Fix take snapshot error when no data in the segment. @@ -46,4 +85,4 @@ Release Notes. ### Chores - Upgrade Go and npm dependencies for CVE fixes. -- Bump ui and mcp npm dependencies for CVE fixes. \ No newline at end of file +- Bump ui and mcp npm dependencies for CVE fixes. diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go index fadf4ee02..743df02b5 100644 --- a/test/cases/measure/measure.go +++ b/test/cases/measure/measure.go @@ -39,7 +39,7 @@ var ( } ) -var _ = g.DescribeTable("Scanning Measures", verify, +var measureEntries = []any{ g.Entry("filter hidden tag projection", helpers.Args{Input: "filter_hidden_tag", Duration: 25 * time.Minute, Offset: -20 * time.Minute, DisOrder: true}), g.Entry("index mode filter hidden tag projection", helpers.Args{Input: "index_mode_filter_hidden_tag", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), g.Entry("all", helpers.Args{Input: "all", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), @@ -93,4 +93,11 @@ var _ = g.DescribeTable("Scanning Measures", verify, g.Entry("filter by non-existent tag", helpers.Args{Input: "filter_non_existent_tag", Duration: 25 * time.Minute, Offset: -20 * time.Minute, WantErr: true}), g.Entry("project non-existent tag", helpers.Args{Input: "project_non_existent_tag", Duration: 25 * time.Minute, Offset: -20 * time.Minute, WantErr: true}), g.Entry("write mixed", helpers.Args{Input: "write_mixed", Duration: 15 * time.Minute, Offset: 25 * time.Minute, DisOrder: true}), -) +} + +// RegisterTable registers the measure test table with the given description. +func RegisterTable(description string) bool { + return g.DescribeTable(description, append([]any{verify}, measureEntries...)...) +} + +var _ = RegisterTable("Scanning Measures") diff --git a/test/cases/property/property.go b/test/cases/property/property.go index bb1311553..9b8d2efb5 100644 --- a/test/cases/property/property.go +++ b/test/cases/property/property.go @@ -37,7 +37,7 @@ var ( } ) -var _ = g.DescribeTable("Scanning Properties", verify, +var propertyEntries = []any{ g.Entry("all", helpers.Args{Input: "all"}), g.Entry("limit", helpers.Args{Input: "limit"}), g.Entry("query by criteria", helpers.Args{Input: "query_by_criteria"}), @@ -47,4 +47,11 @@ var _ = g.DescribeTable("Scanning Properties", verify, g.Entry("order by with limit", helpers.Args{Input: "order_by_with_limit"}), g.Entry("order without projection", helpers.Args{Input: "order_without_projection"}), g.Entry("query with order", helpers.Args{Input: "query_with_order"}), -) +} + +// RegisterTable registers the property test table with the given description. +func RegisterTable(description string) bool { + return g.DescribeTable(description, append([]any{verify}, propertyEntries...)...) +} + +var _ = RegisterTable("Scanning Properties") diff --git a/test/cases/stream/stream.go b/test/cases/stream/stream.go index 662d66285..f9c73e93b 100644 --- a/test/cases/stream/stream.go +++ b/test/cases/stream/stream.go @@ -39,11 +39,7 @@ var ( } ) -var _ = g.DescribeTable("Scanning Streams", func(args helpers.Args) { - gm.Eventually(func(innerGm gm.Gomega) { - verify(innerGm, args) - }, flags.EventuallyTimeout).Should(gm.Succeed()) -}, +var streamEntries = []any{ g.Entry("all elements", helpers.Args{Input: "all", Duration: 1 * time.Hour}), g.Entry("projection with http.method", helpers.Args{Input: "all_with_http_method", Duration: 1 * time.Hour}), g.Entry("limit", helpers.Args{Input: "limit", Duration: 1 * time.Hour}), @@ -99,4 +95,15 @@ var _ = g.DescribeTable("Scanning Streams", func(args helpers.Args) { g.Entry("filter by non-existent tag", helpers.Args{Input: "filter_non_existent_tag", Duration: 1 * time.Hour, WantErr: true}), g.Entry("project non-existent tag", helpers.Args{Input: "project_non_existent_tag", Duration: 1 * time.Hour, WantErr: true}), g.Entry("write mixed", helpers.Args{Input: "write_mixed", Duration: 1 * time.Hour, IgnoreElementID: true}), -) +} + +// RegisterTable registers the stream test table with the given description. +func RegisterTable(description string) bool { + return g.DescribeTable(description, append([]any{func(args helpers.Args) { + gm.Eventually(func(innerGm gm.Gomega) { + verify(innerGm, args) + }, flags.EventuallyTimeout).Should(gm.Succeed()) + }}, streamEntries...)...) +} + +var _ = RegisterTable("Scanning Streams") diff --git a/test/cases/topn/topn.go b/test/cases/topn/topn.go index d08317f13..c1c96eef2 100644 --- a/test/cases/topn/topn.go +++ b/test/cases/topn/topn.go @@ -52,11 +52,6 @@ var topnEntries = []any{ g.Entry("using in operation in aggregation", helpers.Args{Input: "in", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), g.Entry("using not-in operation in aggregation", helpers.Args{Input: "not_in", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), g.Entry("max top3 with version merged order by desc", helpers.Args{Input: "aggr_version_merged", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), - g.Entry("int64 max aggregation test", helpers.Args{Input: "topn_max", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), - g.Entry("int64 min aggregation test", helpers.Args{Input: "topn_min", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), - g.Entry("int64 sum aggregation test", helpers.Args{Input: "topn_sum", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), - g.Entry("int64 mean aggregation test", helpers.Args{Input: "topn_mean", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), - g.Entry("int64 count aggregation test", helpers.Args{Input: "topn_count", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), } // RegisterTable registers the topn test table with the given description. diff --git a/test/cases/trace/trace.go b/test/cases/trace/trace.go index 88b9744d7..474b4f677 100644 --- a/test/cases/trace/trace.go +++ b/test/cases/trace/trace.go @@ -37,11 +37,7 @@ var ( } ) -var _ = g.DescribeTable("Scanning Traces", func(args helpers.Args) { - gm.Eventually(func(innerGm gm.Gomega) { - verify(innerGm, args) - }, flags.EventuallyTimeout).Should(gm.Succeed()) -}, +var traceEntries = []any{ g.Entry("query by trace id", helpers.Args{Input: "eq_trace_id", Duration: 1 * time.Hour}), g.Entry("query by trace ids", helpers.Args{Input: "in_trace_ids", Duration: 1 * time.Hour}), g.Entry("query by empty span ids", helpers.Args{Input: "in_empty_span_ids", Duration: 1 * time.Hour, WantEmpty: true}), @@ -69,4 +65,15 @@ var _ = g.DescribeTable("Scanning Traces", func(args helpers.Args) { g.Entry("filter by non-existent tag", helpers.Args{Input: "filter_non_existent_tag", Duration: 1 * time.Hour, WantErr: true}), g.Entry("project non-existent tag", helpers.Args{Input: "project_non_existent_tag", Duration: 1 * time.Hour, WantErr: true}), g.Entry("write mixed", helpers.Args{Input: "write_mixed", Duration: 1 * time.Hour, DisOrder: true}), -) +} + +// RegisterTable registers the trace test table with the given description. +func RegisterTable(description string) bool { + return g.DescribeTable(description, append([]any{func(args helpers.Args) { + gm.Eventually(func(innerGm gm.Gomega) { + verify(innerGm, args) + }, flags.EventuallyTimeout).Should(gm.Succeed()) + }}, traceEntries...)...) +} + +var _ = RegisterTable("Scanning Traces") diff --git a/test/integration/standalone/multi_segments/common.go b/test/integration/standalone/multi_segments/common.go index 87e0da611..be60ff34b 100644 --- a/test/integration/standalone/multi_segments/common.go +++ b/test/integration/standalone/multi_segments/common.go @@ -41,6 +41,7 @@ import ( // SetupResult contains all info returned by SetupFunc. type SetupResult struct { + Restart func() (string, func()) Now time.Time BaseTime time.Time StopFunc func() @@ -91,6 +92,9 @@ var _ = ginkgo.SynchronizedAfterSuite(func() { if connection != nil { gomega.Expect(connection.Close()).To(gomega.Succeed()) } + if round2Conn != nil { + gomega.Expect(round2Conn.Close()).To(gomega.Succeed()) + } }, func() {}) var _ = ginkgo.ReportAfterSuite("Integration Query Suite", func(report ginkgo.Report) { diff --git a/test/integration/standalone/multi_segments/etcd/suite_test.go b/test/integration/standalone/multi_segments/etcd/suite_test.go index 9209d5949..f0b7213bc 100644 --- a/test/integration/standalone/multi_segments/etcd/suite_test.go +++ b/test/integration/standalone/multi_segments/etcd/suite_test.go @@ -24,6 +24,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/apache/skywalking-banyandb/pkg/test" "github.com/apache/skywalking-banyandb/pkg/test/setup" "github.com/apache/skywalking-banyandb/pkg/timestamp" test_cases "github.com/apache/skywalking-banyandb/test/cases" @@ -33,16 +34,35 @@ import ( func init() { multisegments.SetupFunc = func() multisegments.SetupResult { - addr, _, cleanup := setup.Standalone(nil) + path, diskCleanupFn, pathErr := test.NewSpace() + Expect(pathErr).NotTo(HaveOccurred()) + var ports []int + ports, portsErr := test.AllocateFreePorts(4) + Expect(portsErr).NotTo(HaveOccurred()) + addr, _, closeFunc := setup.ClosableStandalone(nil, path, ports) ns := timestamp.NowMilli().UnixNano() now := time.Unix(0, ns-ns%int64(time.Minute)) baseTime := time.Date(now.Year(), now.Month(), now.Day(), 0o0, 0o2, 0, 0, now.Location()) test_cases.Initialize(addr, baseTime) + prevClose, currClose := closeFunc, func() {} return multisegments.SetupResult{ Addr: addr, Now: now, BaseTime: baseTime, - StopFunc: cleanup, + Restart: func() (string, func()) { + time.Sleep(5 * time.Second) + prevClose() + time.Sleep(3 * time.Second) + currClose() + addr, _, closeFunc := setup.EmptyClosableStandalone(nil, path, ports) + prevClose, currClose = currClose, closeFunc + return addr, closeFunc + }, + StopFunc: func() { + currClose() + prevClose() + diskCleanupFn() + }, } } } diff --git a/test/integration/standalone/multi_segments/property/suite_test.go b/test/integration/standalone/multi_segments/property/suite_test.go index 88bbc050f..994c6f2e4 100644 --- a/test/integration/standalone/multi_segments/property/suite_test.go +++ b/test/integration/standalone/multi_segments/property/suite_test.go @@ -38,17 +38,34 @@ func init() { Expect(tmpErr).NotTo(HaveOccurred()) dfWriter := setup.NewDiscoveryFileWriter(tmpDir) config := setup.PropertyClusterConfig(dfWriter) - addr, _, closeFn := setup.Standalone(config) + path, diskCleanupFn, pathErr := test.NewSpace() + Expect(pathErr).NotTo(HaveOccurred()) + var ports []int + ports, portsErr := test.AllocateFreePorts(5) + Expect(portsErr).NotTo(HaveOccurred()) + addr, _, closeFunc := setup.ClosableStandalone(config, path, ports) ns := timestamp.NowMilli().UnixNano() now := time.Unix(0, ns-ns%int64(time.Minute)) baseTime := time.Date(now.Year(), now.Month(), now.Day(), 0o0, 0o2, 0, 0, now.Location()) test_cases.Initialize(addr, baseTime) + prevClose, currClose := closeFunc, func() {} return multisegments.SetupResult{ Addr: addr, Now: now, BaseTime: baseTime, + Restart: func() (string, func()) { + time.Sleep(5 * time.Second) + prevClose() + time.Sleep(3 * time.Second) + currClose() + addr, _, closeFunc := setup.EmptyClosableStandalone(config, path, ports) + prevClose, currClose = currClose, closeFunc + return addr, closeFunc + }, StopFunc: func() { - closeFn() + currClose() + prevClose() + diskCleanupFn() tmpDirCleanup() }, } diff --git a/test/integration/standalone/multi_segments/round2.go b/test/integration/standalone/multi_segments/round2.go new file mode 100644 index 000000000..86cf80101 --- /dev/null +++ b/test/integration/standalone/multi_segments/round2.go @@ -0,0 +1,61 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package multisegments provides shared test setup for multi-segment integration tests. +package multisegments + +import ( + "time" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "github.com/apache/skywalking-banyandb/pkg/grpchelper" + "github.com/apache/skywalking-banyandb/pkg/test/helpers" + casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure" + casesstream "github.com/apache/skywalking-banyandb/test/cases/stream" + casestopn "github.com/apache/skywalking-banyandb/test/cases/topn" + casestrace "github.com/apache/skywalking-banyandb/test/cases/trace" +) + +var round2Conn *grpc.ClientConn + +var _ = ginkgo.Describe("on-disk data", ginkgo.Ordered, func() { + ginkgo.BeforeAll(func() { + gomega.Expect(result.Restart).NotTo(gomega.BeNil()) + newAddr, _ := result.Restart() + var connErr error + round2Conn, connErr = grpchelper.Conn(newAddr, 10*time.Second, + grpc.WithTransportCredentials(insecure.NewCredentials())) + gomega.Expect(connErr).NotTo(gomega.HaveOccurred()) + sharedCtx := helpers.SharedContext{ + Connection: round2Conn, + BaseTime: result.BaseTime, + } + casesstream.SharedContext = sharedCtx + casesmeasure.SharedContext = sharedCtx + casestopn.SharedContext = sharedCtx + casestrace.SharedContext = sharedCtx + }) + + casesstream.RegisterTable("Scanning Streams") + casesmeasure.RegisterTable("Scanning Measures") + casestopn.RegisterTable("TopN Tests") + casestrace.RegisterTable("Scanning Traces") +}) diff --git a/test/integration/standalone/query/common.go b/test/integration/standalone/query/common.go index 3d9eec194..49c44997c 100644 --- a/test/integration/standalone/query/common.go +++ b/test/integration/standalone/query/common.go @@ -42,6 +42,7 @@ import ( // SetupResult contains all info returned by SetupFunc. type SetupResult struct { + Restart func() (string, func()) Now time.Time StopFunc func() Addr string @@ -96,6 +97,9 @@ var _ = ginkgo.SynchronizedAfterSuite(func() { if connection != nil { gomega.Expect(connection.Close()).To(gomega.Succeed()) } + if round2Conn != nil { + gomega.Expect(round2Conn.Close()).To(gomega.Succeed()) + } }, func() {}) var _ = ginkgo.ReportAfterSuite("Integration Query Suite", func(report ginkgo.Report) { diff --git a/test/integration/standalone/query/etcd/suite_test.go b/test/integration/standalone/query/etcd/suite_test.go index fc8bbbdfc..7da983f55 100644 --- a/test/integration/standalone/query/etcd/suite_test.go +++ b/test/integration/standalone/query/etcd/suite_test.go @@ -24,6 +24,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/apache/skywalking-banyandb/pkg/test" "github.com/apache/skywalking-banyandb/pkg/test/setup" "github.com/apache/skywalking-banyandb/pkg/timestamp" test_cases "github.com/apache/skywalking-banyandb/test/cases" @@ -33,14 +34,33 @@ import ( func init() { query.SetupFunc = func() query.SetupResult { - addr, _, cleanup := setup.Standalone(nil) + path, diskCleanupFn, pathErr := test.NewSpace() + Expect(pathErr).NotTo(HaveOccurred()) + var ports []int + ports, portsErr := test.AllocateFreePorts(4) + Expect(portsErr).NotTo(HaveOccurred()) + addr, _, closeFunc := setup.ClosableStandalone(nil, path, ports) ns := timestamp.NowMilli().UnixNano() now := time.Unix(0, ns-ns%int64(time.Minute)) test_cases.Initialize(addr, now) + prevClose, currClose := closeFunc, func() {} return query.SetupResult{ - Addr: addr, - Now: now, - StopFunc: cleanup, + Addr: addr, + Now: now, + Restart: func() (string, func()) { + time.Sleep(5 * time.Second) + prevClose() + time.Sleep(3 * time.Second) + currClose() + addr, _, closeFunc := setup.EmptyClosableStandalone(nil, path, ports) + prevClose, currClose = currClose, closeFunc + return addr, closeFunc + }, + StopFunc: func() { + currClose() + prevClose() + diskCleanupFn() + }, } } } diff --git a/test/integration/standalone/query/property/suite_test.go b/test/integration/standalone/query/property/suite_test.go index 41b5361b6..ac3b6b2c2 100644 --- a/test/integration/standalone/query/property/suite_test.go +++ b/test/integration/standalone/query/property/suite_test.go @@ -38,15 +38,32 @@ func init() { Expect(tmpErr).NotTo(HaveOccurred()) dfWriter := setup.NewDiscoveryFileWriter(tmpDir) config := setup.PropertyClusterConfig(dfWriter) - addr, _, closeFn := setup.Standalone(config) + path, diskCleanupFn, pathErr := test.NewSpace() + Expect(pathErr).NotTo(HaveOccurred()) + var ports []int + ports, portsErr := test.AllocateFreePorts(5) + Expect(portsErr).NotTo(HaveOccurred()) + addr, _, closeFunc := setup.ClosableStandalone(config, path, ports) ns := timestamp.NowMilli().UnixNano() now := time.Unix(0, ns-ns%int64(time.Minute)) test_cases.Initialize(addr, now) + prevClose, currClose := closeFunc, func() {} return query.SetupResult{ Addr: addr, Now: now, + Restart: func() (string, func()) { + time.Sleep(5 * time.Second) + prevClose() + time.Sleep(3 * time.Second) + currClose() + addr, _, closeFunc := setup.EmptyClosableStandalone(config, path, ports) + prevClose, currClose = currClose, closeFunc + return addr, closeFunc + }, StopFunc: func() { - closeFn() + currClose() + prevClose() + diskCleanupFn() tmpDirCleanup() }, } diff --git a/test/integration/standalone/query/round2.go b/test/integration/standalone/query/round2.go new file mode 100644 index 000000000..df76b1503 --- /dev/null +++ b/test/integration/standalone/query/round2.go @@ -0,0 +1,64 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package query provides shared test setup for query integration tests. +package query + +import ( + "time" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "github.com/apache/skywalking-banyandb/pkg/grpchelper" + "github.com/apache/skywalking-banyandb/pkg/test/helpers" + casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure" + casesproperty "github.com/apache/skywalking-banyandb/test/cases/property" + casesstream "github.com/apache/skywalking-banyandb/test/cases/stream" + casestopn "github.com/apache/skywalking-banyandb/test/cases/topn" + casestrace "github.com/apache/skywalking-banyandb/test/cases/trace" +) + +var round2Conn *grpc.ClientConn + +var _ = ginkgo.Describe("on-disk data", ginkgo.Ordered, func() { + ginkgo.BeforeAll(func() { + gomega.Expect(result.Restart).NotTo(gomega.BeNil()) + newAddr, _ := result.Restart() + var connErr error + round2Conn, connErr = grpchelper.Conn(newAddr, 10*time.Second, + grpc.WithTransportCredentials(insecure.NewCredentials())) + gomega.Expect(connErr).NotTo(gomega.HaveOccurred()) + sharedCtx := helpers.SharedContext{ + Connection: round2Conn, + BaseTime: result.Now, + } + casesstream.SharedContext = sharedCtx + casesmeasure.SharedContext = sharedCtx + casestopn.SharedContext = sharedCtx + casestrace.SharedContext = sharedCtx + casesproperty.SharedContext = sharedCtx + }) + + casesstream.RegisterTable("Scanning Streams") + casesmeasure.RegisterTable("Scanning Measures") + casestopn.RegisterTable("TopN Tests") + casestrace.RegisterTable("Scanning Traces") + casesproperty.RegisterTable("Scanning Properties") +}) diff --git a/test/integration/standalone/query_ondisk/common.go b/test/integration/standalone/query_ondisk/common.go deleted file mode 100644 index 455f6cd39..000000000 --- a/test/integration/standalone/query_ondisk/common.go +++ /dev/null @@ -1,103 +0,0 @@ -// Licensed to Apache Software Foundation (ASF) under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Apache Software Foundation (ASF) licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -// Package queryondisk provides shared test setup for on-disk query integration tests. -package queryondisk - -import ( - "time" - - "github.com/onsi/ginkgo/v2" - "github.com/onsi/gomega" - "github.com/onsi/gomega/gleak" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - - "github.com/apache/skywalking-banyandb/pkg/grpchelper" - "github.com/apache/skywalking-banyandb/pkg/logger" - "github.com/apache/skywalking-banyandb/pkg/pool" - "github.com/apache/skywalking-banyandb/pkg/test/flags" - "github.com/apache/skywalking-banyandb/pkg/test/gmatcher" - "github.com/apache/skywalking-banyandb/pkg/test/helpers" - casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure" - casesstream "github.com/apache/skywalking-banyandb/test/cases/stream" - casestopn "github.com/apache/skywalking-banyandb/test/cases/topn" - casestrace "github.com/apache/skywalking-banyandb/test/cases/trace" -) - -// SetupResult contains all info returned by SetupFunc. -type SetupResult struct { - Now time.Time - StopFunc func() - Addr string -} - -// SetupFunc is provided by sub-packages to start the environment. -var SetupFunc func() SetupResult - -var ( - result SetupResult - connection *grpc.ClientConn - goods []gleak.Goroutine -) - -var _ = ginkgo.SynchronizedBeforeSuite(func() []byte { - goods = gleak.Goroutines() - gomega.Expect(logger.Init(logger.Logging{ - Env: "dev", - Level: flags.LogLevel, - })).To(gomega.Succeed()) - result = SetupFunc() - return []byte(result.Addr) -}, func(address []byte) { - var err error - connection, err = grpchelper.Conn(string(address), 10*time.Second, - grpc.WithTransportCredentials(insecure.NewCredentials())) - casesstream.SharedContext = helpers.SharedContext{ - Connection: connection, - BaseTime: result.Now, - } - casesmeasure.SharedContext = helpers.SharedContext{ - Connection: connection, - BaseTime: result.Now, - } - casestopn.SharedContext = helpers.SharedContext{ - Connection: connection, - BaseTime: result.Now, - } - casestrace.SharedContext = helpers.SharedContext{ - Connection: connection, - BaseTime: result.Now, - } - gomega.Expect(err).NotTo(gomega.HaveOccurred()) -}) - -var _ = ginkgo.SynchronizedAfterSuite(func() { - if connection != nil { - gomega.Expect(connection.Close()).To(gomega.Succeed()) - } -}, func() {}) - -var _ = ginkgo.ReportAfterSuite("Integration Query OnDisk Suite", func(report ginkgo.Report) { - if report.SuiteSucceeded { - if result.StopFunc != nil { - result.StopFunc() - } - gomega.Eventually(gleak.Goroutines, flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods)) - gomega.Eventually(pool.AllRefsCount, flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef()) - } -}) diff --git a/test/integration/standalone/query_ondisk/etcd/suite_test.go b/test/integration/standalone/query_ondisk/etcd/suite_test.go deleted file mode 100644 index e318c9b12..000000000 --- a/test/integration/standalone/query_ondisk/etcd/suite_test.go +++ /dev/null @@ -1,64 +0,0 @@ -// Licensed to Apache Software Foundation (ASF) under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Apache Software Foundation (ASF) licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package etcd_test - -import ( - "testing" - "time" - - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - - "github.com/apache/skywalking-banyandb/pkg/test" - "github.com/apache/skywalking-banyandb/pkg/test/setup" - "github.com/apache/skywalking-banyandb/pkg/timestamp" - test_cases "github.com/apache/skywalking-banyandb/test/cases" - integration_standalone "github.com/apache/skywalking-banyandb/test/integration/standalone" - "github.com/apache/skywalking-banyandb/test/integration/standalone/query_ondisk" -) - -func init() { - queryondisk.SetupFunc = func() queryondisk.SetupResult { - path, diskCleanupFn, pathErr := test.NewSpace() - Expect(pathErr).NotTo(HaveOccurred()) - var ports []int - ports, portsErr := test.AllocateFreePorts(4) - Expect(portsErr).NotTo(HaveOccurred()) - addr, _, closeFunc := setup.ClosableStandalone(nil, path, ports) - ns := timestamp.NowMilli().UnixNano() - now := time.Unix(0, ns-ns%int64(time.Minute)) - test_cases.Initialize(addr, now) - time.Sleep(5 * time.Second) - closeFunc() - time.Sleep(time.Second) - addr, _, closeFunc = setup.EmptyClosableStandalone(nil, path, ports) - return queryondisk.SetupResult{ - Addr: addr, - Now: now, - StopFunc: func() { - closeFunc() - diskCleanupFn() - }, - } - } -} - -func TestEtcdIntegrationQueryOnDisk(t *testing.T) { - RegisterFailHandler(Fail) - RunSpecs(t, "Integration Query OnDisk Suite", Label(integration_standalone.Labels...)) -} diff --git a/test/integration/standalone/query_ondisk/property/suite_test.go b/test/integration/standalone/query_ondisk/property/suite_test.go deleted file mode 100644 index 27a4f8cb7..000000000 --- a/test/integration/standalone/query_ondisk/property/suite_test.go +++ /dev/null @@ -1,69 +0,0 @@ -// Licensed to Apache Software Foundation (ASF) under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Apache Software Foundation (ASF) licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package property_test - -import ( - "testing" - "time" - - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - - "github.com/apache/skywalking-banyandb/pkg/test" - "github.com/apache/skywalking-banyandb/pkg/test/setup" - "github.com/apache/skywalking-banyandb/pkg/timestamp" - test_cases "github.com/apache/skywalking-banyandb/test/cases" - integration_standalone "github.com/apache/skywalking-banyandb/test/integration/standalone" - "github.com/apache/skywalking-banyandb/test/integration/standalone/query_ondisk" -) - -func init() { - queryondisk.SetupFunc = func() queryondisk.SetupResult { - tmpDir, tmpDirCleanup, tmpErr := test.NewSpace() - Expect(tmpErr).NotTo(HaveOccurred()) - dfWriter := setup.NewDiscoveryFileWriter(tmpDir) - config := setup.PropertyClusterConfig(dfWriter) - path, diskCleanupFn, pathErr := test.NewSpace() - Expect(pathErr).NotTo(HaveOccurred()) - var ports []int - ports, portsErr := test.AllocateFreePorts(5) - Expect(portsErr).NotTo(HaveOccurred()) - addr, _, closeFunc := setup.ClosableStandalone(config, path, ports) - ns := timestamp.NowMilli().UnixNano() - now := time.Unix(0, ns-ns%int64(time.Minute)) - test_cases.Initialize(addr, now) - time.Sleep(5 * time.Second) - closeFunc() - time.Sleep(time.Second) - addr, _, closeFunc = setup.EmptyClosableStandalone(config, path, ports) - return queryondisk.SetupResult{ - Addr: addr, - Now: now, - StopFunc: func() { - closeFunc() - diskCleanupFn() - tmpDirCleanup() - }, - } - } -} - -func TestPropertyIntegrationQueryOnDisk(t *testing.T) { - RegisterFailHandler(Fail) - RunSpecs(t, "Integration Query OnDisk Suite", Label(integration_standalone.Labels...)) -} diff --git a/test_table b/test_table new file mode 100755 index 000000000..5158e1c9b Binary files /dev/null and b/test_table differ
