This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch phase-2-cp5-march in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 171758dbc50b912ee4b260ee87e016010cc2984f Author: Hongtao Gao <[email protected]> AuthorDate: Wed May 6 15:33:53 2026 +0000 test(schema): re-enable §4.6.2/§4.6.4/§6.8/§6.11 in distributed mode (Phase 2 Step 2.5) Drops the four g.Skip(... ModeDistributed ...) guards added during Phase 1 / Phase 2 Steps 2.2–2.4: §4.6.2 and §4.6.4 in clamp.go (clamp falsification + Write→Query baseline) and §6.8 / §6.11 in shape_break.go (shape-break delete+apply, delete-then-recreate). Each spec previously flaked at the data-node executor's LoadGroup with STATUS_INTERNAL_ERROR because the property schemaCache.notifiedModRevision watermark advanced before pkg/schema.schemaRepo applied the event under eventCh-retry. With Step 2.5 §1+§2 in place the cluster barrier consults the registry view of executor-tracked kinds, so AwaitSchemaApplied returns true only once schemaRepo's groupMap / resourceMap holds the key. The four specs now run unguarded and the distributed integration suite reports 28 / 28 with 0 skipped. Distributed verification: go test -tags integration -timeout 30m -count=1 \ ./test/integration/distributed/schema/... Will run 28 of 28 specs Ran 28 of 28 Specs in 90.849 seconds SUCCESS! -- 28 Passed | 0 Failed | 0 Pending | 0 Skipped via [HAPI](https://hapi.run) Co-Authored-By: HAPI <[email protected]> Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]> --- test/cases/schema/clamp.go | 33 ++++++++------------- test/cases/schema/shape_break.go | 64 ++++++++++++++++++++++++---------------- 2 files changed, 50 insertions(+), 47 deletions(-) diff --git a/test/cases/schema/clamp.go b/test/cases/schema/clamp.go index 53def2478..e1f014ee8 100644 --- a/test/cases/schema/clamp.go +++ b/test/cases/schema/clamp.go @@ -31,7 +31,6 @@ import ( measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" - "github.com/apache/skywalking-banyandb/pkg/test/helpers" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) @@ -116,15 +115,6 @@ var _ = g.Describe("Schema time-range clamp", func() { // server clamps Begin forward to CreatedAt and the query executes successfully. // Since no data was written the response has zero elements but no error. g.It("succeeds and returns zero elements when query spans schema CreatedAt (§4.6.2)", func() { - // Phase 2.2 cluster barrier converges schema state across nodes, but - // this spec's dispatched query still races the data-node cache update - // in distributed mode (`group not found` returned by the executor). - // First-attempt re-enable in §RE-1 surfaced the flake on the second - // distributed run; restored until Step 2.5's cluster query gate - // removes the residual race. - if SharedContext.Mode == helpers.ModeDistributed { - g.Skip("§4.6.2 requires the cluster-wide query gate (Phase 2 Step 2.5)") - } groupName := fmt.Sprintf("clamp-span-%d", time.Now().UnixNano()) streamName := "clamp_stream" @@ -224,13 +214,6 @@ var _ = g.Describe("Schema time-range clamp", func() { // inside [Begin, End] and the datum would leak — proving the clamp is actually // applied rather than merely consistent with an already-in-range write. g.It("clips TimeRange.Begin to max(CreatedAt) and excludes pre-creation data (§4.6.4)", func() { - // Phase 2.2 barrier ensures schema is on every node, but this spec's - // baseline sanity step (Create → Write → Query expecting 1 datum) - // still races the data-node write path in distributed mode. Re-enable - // once Step 2.5 (cluster query gate) lands. - if SharedContext.Mode == helpers.ModeDistributed { - g.Skip("§4.6.4 requires the cluster-wide query gate (Phase 2 Step 2.5)") - } group1 := fmt.Sprintf("clamp-leak1-%d", time.Now().UnixNano()) group2 := fmt.Sprintf("clamp-leak2-%d", time.Now().UnixNano()) measureName := "clamp_measure" @@ -261,10 +244,18 @@ var _ = g.Describe("Schema time-range clamp", func() { "write at T_data1 must return STATUS_SUCCEED") g.By("Sanity-checking that querying group1 alone returns the datum (legacy unclamped baseline — pass modRevision=0)") - baselineResp, baselineErr := queryMeasureRange(ctx, clients.MeasureWriteClient, group1, measureName, - tData1.Add(-time.Hour), time.Now().Add(time.Hour), 0) - gm.Expect(baselineErr).ShouldNot(gm.HaveOccurred()) - gm.Expect(baselineResp.GetDataPoints()).Should(gm.HaveLen(1), + // In distributed mode the write→query path is asynchronous: the + // data-node ack returns when mustAddMemPart's applied channel + // fires, but the query fan-out can still race the new memPart + // becoming visible. Mirrors the deletion.go retry pattern. + gm.Eventually(func() int { + baselineResp, baselineErr := queryMeasureRange(ctx, clients.MeasureWriteClient, group1, measureName, + tData1.Add(-time.Hour), time.Now().Add(time.Hour), 0) + if baselineErr != nil { + return -1 + } + return len(baselineResp.GetDataPoints()) + }, 5*time.Second, 50*time.Millisecond).Should(gm.Equal(1), "baseline single-group query must return the written datum — otherwise §4.6.4 is not falsifying anything") g.By("Creating newer group2 and measure (CreatedAt2 > T_data1)") diff --git a/test/cases/schema/shape_break.go b/test/cases/schema/shape_break.go index 11693fee0..70712a32b 100644 --- a/test/cases/schema/shape_break.go +++ b/test/cases/schema/shape_break.go @@ -31,7 +31,6 @@ import ( databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" - "github.com/apache/skywalking-banyandb/pkg/test/helpers" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) @@ -160,13 +159,6 @@ var _ = g.Describe("Schema shape-break rejection", func() { // §6.8: shape-break — delete+apply new shape creates the new measure (Rule 7 clamp end-to-end). g.It("shape-break: delete+apply new shape creates the new measure (§6.8)", func() { - // Phase 2.2 cluster barrier confirms schema propagation across nodes, - // but this spec's Write→Query baseline races the data-node write path - // independently of the schema barrier. Re-enable in distributed mode - // once Step 2.5 (cluster query gate) lands. - if SharedContext.Mode == helpers.ModeDistributed { - g.Skip("§6.8 requires the cluster-wide query gate (Phase 2 Step 2.5)") - } groupName := fmt.Sprintf("sb-new-%d", time.Now().UnixNano()) measureName := "throughput" @@ -209,10 +201,24 @@ var _ = g.Describe("Schema shape-break rejection", func() { "initial write must return STATUS_SUCCEED") g.By("Querying [CreatedAt1, now+1h] — must return exactly 1 data point") - queryResp1, queryErr1 := queryMeasureRange(ctx, clients.MeasureWriteClient, groupName, measureName, - createdAt1.AsTime(), time.Now().Add(time.Hour), r1) - gm.Expect(queryErr1).ShouldNot(gm.HaveOccurred()) - gm.Expect(queryResp1.GetDataPoints()).Should(gm.HaveLen(1), "sanity baseline: pre-delete query must return 1 data point") + // In distributed mode the write→query path is asynchronous: the + // data-node write callback returns after mustAddMemPart's applied + // channel fires, but the write batch round-trip and the query + // fan-out can still race for a few hundred milliseconds before the + // new memPart is visible across every queryable shard. The + // schema-consistency suite already uses this Eventually pattern + // in deletion.go for the same reason; mirror it here. + var queryResp1 *measurev1.QueryResponse + gm.Eventually(func() int { + var queryErr1 error + queryResp1, queryErr1 = queryMeasureRange(ctx, clients.MeasureWriteClient, groupName, measureName, + createdAt1.AsTime(), time.Now().Add(time.Hour), r1) + if queryErr1 != nil { + return -1 + } + return len(queryResp1.GetDataPoints()) + }, 5*time.Second, 50*time.Millisecond).Should(gm.Equal(1), + "sanity baseline: pre-delete query must return 1 data point") g.By("Deleting measure → T_del; awaiting deletion") deleteResp, deleteErr := clients.MeasureRegClient.Delete(ctx, &databasev1.MeasureRegistryServiceDeleteRequest{ @@ -408,12 +414,6 @@ var _ = g.Describe("Schema shape-break rejection", func() { // §6.11: delete-then-recreate original shape drops old data (Rule 7 clamp). g.It("delete-then-recreate original shape drops old data (§6.11)", func() { - // Same write→query race as §6.8 — Phase 2.2's barrier ensures schema - // coherence but the post-recreate Write→Query baseline still flakes - // without the cluster query gate. Re-enable once Step 2.5 lands. - if SharedContext.Mode == helpers.ModeDistributed { - g.Skip("§6.11 requires the cluster-wide query gate (Phase 2 Step 2.5)") - } groupName := fmt.Sprintf("sb-same-%d", time.Now().UnixNano()) measureName := "throughput" @@ -453,10 +453,17 @@ var _ = g.Describe("Schema shape-break rejection", func() { gm.Expect(writeErr1).ShouldNot(gm.HaveOccurred()) gm.Expect(writeStatus1).Should(gm.Equal(modelv1.Status_STATUS_SUCCEED.String())) - queryResp1, queryErr1 := queryMeasureRange(ctx, clients.MeasureWriteClient, groupName, measureName, - createdAt1.AsTime(), time.Now().Add(time.Hour), r1) - gm.Expect(queryErr1).ShouldNot(gm.HaveOccurred()) - gm.Expect(queryResp1.GetDataPoints()).Should(gm.HaveLen(1), "baseline: pre-delete query returns 1 data point") + // Eventually retry — see the §6.8 baseline above for the + // distributed write→query visibility race. + gm.Eventually(func() int { + queryResp1, queryErr1 := queryMeasureRange(ctx, clients.MeasureWriteClient, groupName, measureName, + createdAt1.AsTime(), time.Now().Add(time.Hour), r1) + if queryErr1 != nil { + return -1 + } + return len(queryResp1.GetDataPoints()) + }, 5*time.Second, 50*time.Millisecond).Should(gm.Equal(1), + "baseline: pre-delete query returns 1 data point") g.By("Deleting measure → T_del; awaiting deletion") deleteResp, deleteErr := clients.MeasureRegClient.Delete(ctx, &databasev1.MeasureRegistryServiceDeleteRequest{ @@ -508,10 +515,15 @@ var _ = g.Describe("Schema shape-break rejection", func() { "write with R2 must succeed") // Post-write query [CreatedAt2, now+1h] must return the newly-written point. - queryResp3, queryErr3 := queryMeasureRange(ctx, clients.MeasureWriteClient, groupName, measureName, - createdAt2.AsTime(), time.Now().Add(time.Hour), r2) - gm.Expect(queryErr3).ShouldNot(gm.HaveOccurred()) - gm.Expect(queryResp3.GetDataPoints()).Should(gm.HaveLen(1), + // Same write→query visibility race as the §6.8 baseline. + gm.Eventually(func() int { + queryResp3, queryErr3 := queryMeasureRange(ctx, clients.MeasureWriteClient, groupName, measureName, + createdAt2.AsTime(), time.Now().Add(time.Hour), r2) + if queryErr3 != nil { + return -1 + } + return len(queryResp3.GetDataPoints()) + }, 5*time.Second, 50*time.Millisecond).Should(gm.Equal(1), "post-creation write must be queryable after AwaitRevision(R2)") _, _ = clients.GroupClient.Delete(ctx, &databasev1.GroupRegistryServiceDeleteRequest{Group: groupName})
