This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch phase-2-cp5-march
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 171758dbc50b912ee4b260ee87e016010cc2984f
Author: Hongtao Gao <[email protected]>
AuthorDate: Wed May 6 15:33:53 2026 +0000

    test(schema): re-enable §4.6.2/§4.6.4/§6.8/§6.11 in distributed mode (Phase 
2 Step 2.5)
    
    Drops the four g.Skip(... ModeDistributed ...) guards added during
    Phase 1 / Phase 2 Steps 2.2–2.4: §4.6.2 and §4.6.4 in clamp.go (clamp
    falsification + Write→Query baseline) and §6.8 / §6.11 in shape_break.go
    (shape-break delete+apply, delete-then-recreate). Each spec previously
    flaked at the data-node executor's LoadGroup with STATUS_INTERNAL_ERROR
    because the property schemaCache.notifiedModRevision watermark advanced
    before pkg/schema.schemaRepo applied the event under eventCh-retry.
    
    With Step 2.5 §1+§2 in place the cluster barrier consults the registry
    view of executor-tracked kinds, so AwaitSchemaApplied returns true only
    once schemaRepo's groupMap / resourceMap holds the key. The four specs
    now run unguarded and the distributed integration suite reports
    28 / 28 with 0 skipped.
    
    Distributed verification:
      go test -tags integration -timeout 30m -count=1 \
        ./test/integration/distributed/schema/...
      Will run 28 of 28 specs
      Ran 28 of 28 Specs in 90.849 seconds
      SUCCESS! -- 28 Passed | 0 Failed | 0 Pending | 0 Skipped
    
    via [HAPI](https://hapi.run)
    
    Co-Authored-By: HAPI <[email protected]>
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
---
 test/cases/schema/clamp.go       | 33 ++++++++-------------
 test/cases/schema/shape_break.go | 64 ++++++++++++++++++++++++----------------
 2 files changed, 50 insertions(+), 47 deletions(-)

diff --git a/test/cases/schema/clamp.go b/test/cases/schema/clamp.go
index 53def2478..e1f014ee8 100644
--- a/test/cases/schema/clamp.go
+++ b/test/cases/schema/clamp.go
@@ -31,7 +31,6 @@ import (
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
-       "github.com/apache/skywalking-banyandb/pkg/test/helpers"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
@@ -116,15 +115,6 @@ var _ = g.Describe("Schema time-range clamp", func() {
        // server clamps Begin forward to CreatedAt and the query executes 
successfully.
        // Since no data was written the response has zero elements but no 
error.
        g.It("succeeds and returns zero elements when query spans schema 
CreatedAt (§4.6.2)", func() {
-               // Phase 2.2 cluster barrier converges schema state across 
nodes, but
-               // this spec's dispatched query still races the data-node cache 
update
-               // in distributed mode (`group not found` returned by the 
executor).
-               // First-attempt re-enable in §RE-1 surfaced the flake on the 
second
-               // distributed run; restored until Step 2.5's cluster query gate
-               // removes the residual race.
-               if SharedContext.Mode == helpers.ModeDistributed {
-                       g.Skip("§4.6.2 requires the cluster-wide query gate 
(Phase 2 Step 2.5)")
-               }
                groupName := fmt.Sprintf("clamp-span-%d", time.Now().UnixNano())
                streamName := "clamp_stream"
 
@@ -224,13 +214,6 @@ var _ = g.Describe("Schema time-range clamp", func() {
        // inside [Begin, End] and the datum would leak — proving the clamp is 
actually
        // applied rather than merely consistent with an already-in-range write.
        g.It("clips TimeRange.Begin to max(CreatedAt) and excludes pre-creation 
data (§4.6.4)", func() {
-               // Phase 2.2 barrier ensures schema is on every node, but this 
spec's
-               // baseline sanity step (Create → Write → Query expecting 1 
datum)
-               // still races the data-node write path in distributed mode. 
Re-enable
-               // once Step 2.5 (cluster query gate) lands.
-               if SharedContext.Mode == helpers.ModeDistributed {
-                       g.Skip("§4.6.4 requires the cluster-wide query gate 
(Phase 2 Step 2.5)")
-               }
                group1 := fmt.Sprintf("clamp-leak1-%d", time.Now().UnixNano())
                group2 := fmt.Sprintf("clamp-leak2-%d", time.Now().UnixNano())
                measureName := "clamp_measure"
@@ -261,10 +244,18 @@ var _ = g.Describe("Schema time-range clamp", func() {
                        "write at T_data1 must return STATUS_SUCCEED")
 
                g.By("Sanity-checking that querying group1 alone returns the 
datum (legacy unclamped baseline — pass modRevision=0)")
-               baselineResp, baselineErr := queryMeasureRange(ctx, 
clients.MeasureWriteClient, group1, measureName,
-                       tData1.Add(-time.Hour), time.Now().Add(time.Hour), 0)
-               gm.Expect(baselineErr).ShouldNot(gm.HaveOccurred())
-               gm.Expect(baselineResp.GetDataPoints()).Should(gm.HaveLen(1),
+               // In distributed mode the write→query path is asynchronous: the
+               // data-node ack returns when mustAddMemPart's applied channel
+               // fires, but the query fan-out can still race the new memPart
+               // becoming visible. Mirrors the deletion.go retry pattern.
+               gm.Eventually(func() int {
+                       baselineResp, baselineErr := queryMeasureRange(ctx, 
clients.MeasureWriteClient, group1, measureName,
+                               tData1.Add(-time.Hour), 
time.Now().Add(time.Hour), 0)
+                       if baselineErr != nil {
+                               return -1
+                       }
+                       return len(baselineResp.GetDataPoints())
+               }, 5*time.Second, 50*time.Millisecond).Should(gm.Equal(1),
                        "baseline single-group query must return the written 
datum — otherwise §4.6.4 is not falsifying anything")
 
                g.By("Creating newer group2 and measure (CreatedAt2 > T_data1)")
diff --git a/test/cases/schema/shape_break.go b/test/cases/schema/shape_break.go
index 11693fee0..70712a32b 100644
--- a/test/cases/schema/shape_break.go
+++ b/test/cases/schema/shape_break.go
@@ -31,7 +31,6 @@ import (
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
-       "github.com/apache/skywalking-banyandb/pkg/test/helpers"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
@@ -160,13 +159,6 @@ var _ = g.Describe("Schema shape-break rejection", func() {
 
        // §6.8: shape-break — delete+apply new shape creates the new measure 
(Rule 7 clamp end-to-end).
        g.It("shape-break: delete+apply new shape creates the new measure 
(§6.8)", func() {
-               // Phase 2.2 cluster barrier confirms schema propagation across 
nodes,
-               // but this spec's Write→Query baseline races the data-node 
write path
-               // independently of the schema barrier. Re-enable in 
distributed mode
-               // once Step 2.5 (cluster query gate) lands.
-               if SharedContext.Mode == helpers.ModeDistributed {
-                       g.Skip("§6.8 requires the cluster-wide query gate 
(Phase 2 Step 2.5)")
-               }
                groupName := fmt.Sprintf("sb-new-%d", time.Now().UnixNano())
                measureName := "throughput"
 
@@ -209,10 +201,24 @@ var _ = g.Describe("Schema shape-break rejection", func() 
{
                        "initial write must return STATUS_SUCCEED")
 
                g.By("Querying [CreatedAt1, now+1h] — must return exactly 1 
data point")
-               queryResp1, queryErr1 := queryMeasureRange(ctx, 
clients.MeasureWriteClient, groupName, measureName,
-                       createdAt1.AsTime(), time.Now().Add(time.Hour), r1)
-               gm.Expect(queryErr1).ShouldNot(gm.HaveOccurred())
-               gm.Expect(queryResp1.GetDataPoints()).Should(gm.HaveLen(1), 
"sanity baseline: pre-delete query must return 1 data point")
+               // In distributed mode the write→query path is asynchronous: the
+               // data-node write callback returns after mustAddMemPart's 
applied
+               // channel fires, but the write batch round-trip and the query
+               // fan-out can still race for a few hundred milliseconds before 
the
+               // new memPart is visible across every queryable shard. The
+               // schema-consistency suite already uses this Eventually pattern
+               // in deletion.go for the same reason; mirror it here.
+               var queryResp1 *measurev1.QueryResponse
+               gm.Eventually(func() int {
+                       var queryErr1 error
+                       queryResp1, queryErr1 = queryMeasureRange(ctx, 
clients.MeasureWriteClient, groupName, measureName,
+                               createdAt1.AsTime(), time.Now().Add(time.Hour), 
r1)
+                       if queryErr1 != nil {
+                               return -1
+                       }
+                       return len(queryResp1.GetDataPoints())
+               }, 5*time.Second, 50*time.Millisecond).Should(gm.Equal(1),
+                       "sanity baseline: pre-delete query must return 1 data 
point")
 
                g.By("Deleting measure → T_del; awaiting deletion")
                deleteResp, deleteErr := clients.MeasureRegClient.Delete(ctx, 
&databasev1.MeasureRegistryServiceDeleteRequest{
@@ -408,12 +414,6 @@ var _ = g.Describe("Schema shape-break rejection", func() {
 
        // §6.11: delete-then-recreate original shape drops old data (Rule 7 
clamp).
        g.It("delete-then-recreate original shape drops old data (§6.11)", 
func() {
-               // Same write→query race as §6.8 — Phase 2.2's barrier ensures 
schema
-               // coherence but the post-recreate Write→Query baseline still 
flakes
-               // without the cluster query gate. Re-enable once Step 2.5 
lands.
-               if SharedContext.Mode == helpers.ModeDistributed {
-                       g.Skip("§6.11 requires the cluster-wide query gate 
(Phase 2 Step 2.5)")
-               }
                groupName := fmt.Sprintf("sb-same-%d", time.Now().UnixNano())
                measureName := "throughput"
 
@@ -453,10 +453,17 @@ var _ = g.Describe("Schema shape-break rejection", func() 
{
                gm.Expect(writeErr1).ShouldNot(gm.HaveOccurred())
                
gm.Expect(writeStatus1).Should(gm.Equal(modelv1.Status_STATUS_SUCCEED.String()))
 
-               queryResp1, queryErr1 := queryMeasureRange(ctx, 
clients.MeasureWriteClient, groupName, measureName,
-                       createdAt1.AsTime(), time.Now().Add(time.Hour), r1)
-               gm.Expect(queryErr1).ShouldNot(gm.HaveOccurred())
-               gm.Expect(queryResp1.GetDataPoints()).Should(gm.HaveLen(1), 
"baseline: pre-delete query returns 1 data point")
+               // Eventually retry — see the §6.8 baseline above for the
+               // distributed write→query visibility race.
+               gm.Eventually(func() int {
+                       queryResp1, queryErr1 := queryMeasureRange(ctx, 
clients.MeasureWriteClient, groupName, measureName,
+                               createdAt1.AsTime(), time.Now().Add(time.Hour), 
r1)
+                       if queryErr1 != nil {
+                               return -1
+                       }
+                       return len(queryResp1.GetDataPoints())
+               }, 5*time.Second, 50*time.Millisecond).Should(gm.Equal(1),
+                       "baseline: pre-delete query returns 1 data point")
 
                g.By("Deleting measure → T_del; awaiting deletion")
                deleteResp, deleteErr := clients.MeasureRegClient.Delete(ctx, 
&databasev1.MeasureRegistryServiceDeleteRequest{
@@ -508,10 +515,15 @@ var _ = g.Describe("Schema shape-break rejection", func() 
{
                        "write with R2 must succeed")
 
                // Post-write query [CreatedAt2, now+1h] must return the 
newly-written point.
-               queryResp3, queryErr3 := queryMeasureRange(ctx, 
clients.MeasureWriteClient, groupName, measureName,
-                       createdAt2.AsTime(), time.Now().Add(time.Hour), r2)
-               gm.Expect(queryErr3).ShouldNot(gm.HaveOccurred())
-               gm.Expect(queryResp3.GetDataPoints()).Should(gm.HaveLen(1),
+               // Same write→query visibility race as the §6.8 baseline.
+               gm.Eventually(func() int {
+                       queryResp3, queryErr3 := queryMeasureRange(ctx, 
clients.MeasureWriteClient, groupName, measureName,
+                               createdAt2.AsTime(), time.Now().Add(time.Hour), 
r2)
+                       if queryErr3 != nil {
+                               return -1
+                       }
+                       return len(queryResp3.GetDataPoints())
+               }, 5*time.Second, 50*time.Millisecond).Should(gm.Equal(1),
                        "post-creation write must be queryable after 
AwaitRevision(R2)")
 
                _, _ = clients.GroupClient.Delete(ctx, 
&databasev1.GroupRegistryServiceDeleteRequest{Group: groupName})

Reply via email to