This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 4b9ac7545 test: add deletion verification tests (#968)
4b9ac7545 is described below
commit 4b9ac7545a7084514bdaea7f0b8b5b7b13b745a5
Author: Tanay Paul <[email protected]>
AuthorDate: Thu Feb 26 15:37:53 2026 +0530
test: add deletion verification tests (#968)
* test: add deletion verification tests
---------
Co-authored-by: 吴晟 Wu Sheng <[email protected]>
Co-authored-by: Gao Hongtao <[email protected]>
---
test/cases/schema/deletion.go | 777 +++++++++++++++++++++
.../distributed/schema/schema_suite_test.go | 130 ++++
.../standalone/schema/schema_suite_test.go | 87 +++
3 files changed, 994 insertions(+)
diff --git a/test/cases/schema/deletion.go b/test/cases/schema/deletion.go
new file mode 100644
index 000000000..21b30d66c
--- /dev/null
+++ b/test/cases/schema/deletion.go
@@ -0,0 +1,777 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package schema contains shared test cases for schema-related functionality.
+package schema
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "io"
+ "strconv"
+ "time"
+
+ g "github.com/onsi/ginkgo/v2"
+ gm "github.com/onsi/gomega"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ streamv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+ tracev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
+ "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+)
+
+// SharedContext is set by the test environment (standalone or distributed).
+var SharedContext helpers.SharedContext
+
+// Clients holds all necessary gRPC clients for deletion tests.
+type Clients struct {
+ GroupClient databasev1.GroupRegistryServiceClient
+ MeasureRegClient databasev1.MeasureRegistryServiceClient
+ StreamRegClient databasev1.StreamRegistryServiceClient
+ TraceRegClient databasev1.TraceRegistryServiceClient
+ IndexRuleClient databasev1.IndexRuleRegistryServiceClient
+ IndexRuleBindingClient databasev1.IndexRuleBindingRegistryServiceClient
+ MeasureWriteClient measurev1.MeasureServiceClient
+ StreamWriteClient streamv1.StreamServiceClient
+ TraceWriteClient tracev1.TraceServiceClient
+}
+
+// Shared test cases. Automatically registered when this package is imported.
+var _ = g.Describe("Schema deletion", func() {
+ var (
+ ctx context.Context
+ clients *Clients
+ )
+
+ g.BeforeEach(func() {
+ ctx = context.Background()
+ conn := SharedContext.Connection
+ clients = &Clients{
+ GroupClient:
databasev1.NewGroupRegistryServiceClient(conn),
+ MeasureRegClient:
databasev1.NewMeasureRegistryServiceClient(conn),
+ StreamRegClient:
databasev1.NewStreamRegistryServiceClient(conn),
+ TraceRegClient:
databasev1.NewTraceRegistryServiceClient(conn),
+ IndexRuleClient:
databasev1.NewIndexRuleRegistryServiceClient(conn),
+ IndexRuleBindingClient:
databasev1.NewIndexRuleBindingRegistryServiceClient(conn),
+ MeasureWriteClient:
measurev1.NewMeasureServiceClient(conn),
+ StreamWriteClient:
streamv1.NewStreamServiceClient(conn),
+ TraceWriteClient:
tracev1.NewTraceServiceClient(conn),
+ }
+ })
+
+ g.It("should delete measure correctly", func() {
+ groupName := fmt.Sprintf("del-measure-%d",
time.Now().UnixNano())
+ measureName := "test_measure"
+
+ g.By("Creating measure group")
+ _, err := clients.GroupClient.Create(ctx,
&databasev1.GroupRegistryServiceCreateRequest{
+ Group: &commonv1.Group{
+ Metadata: &commonv1.Metadata{Name: groupName},
+ Catalog: commonv1.Catalog_CATALOG_MEASURE,
+ ResourceOpts: &commonv1.ResourceOpts{
+ ShardNum: 2,
+ SegmentInterval:
&commonv1.IntervalRule{Unit: commonv1.IntervalRule_UNIT_DAY, Num: 1},
+ Ttl:
&commonv1.IntervalRule{Unit: commonv1.IntervalRule_UNIT_DAY, Num: 7},
+ },
+ },
+ })
+ gm.Expect(err).ShouldNot(gm.HaveOccurred())
+
+ g.By("Creating measure schema")
+ err = createMeasureSchema(ctx, clients.MeasureRegClient,
groupName, measureName)
+ gm.Expect(err).ShouldNot(gm.HaveOccurred())
+
+ g.By("Verifying measure deletion")
+ err = VerifyMeasureDeletion(ctx, clients, groupName,
measureName)
+ gm.Expect(err).ShouldNot(gm.HaveOccurred())
+
+ _, _ = clients.GroupClient.Delete(ctx,
&databasev1.GroupRegistryServiceDeleteRequest{Group: groupName})
+ })
+
+ g.It("should delete stream correctly", func() {
+ groupName := fmt.Sprintf("del-stream-%d", time.Now().UnixNano())
+ streamName := "test_stream"
+
+ g.By("Creating stream group")
+ _, err := clients.GroupClient.Create(ctx,
&databasev1.GroupRegistryServiceCreateRequest{
+ Group: &commonv1.Group{
+ Metadata: &commonv1.Metadata{Name: groupName},
+ Catalog: commonv1.Catalog_CATALOG_STREAM,
+ ResourceOpts: &commonv1.ResourceOpts{
+ ShardNum: 2,
+ SegmentInterval:
&commonv1.IntervalRule{Unit: commonv1.IntervalRule_UNIT_DAY, Num: 1},
+ Ttl:
&commonv1.IntervalRule{Unit: commonv1.IntervalRule_UNIT_DAY, Num: 7},
+ },
+ },
+ })
+ gm.Expect(err).ShouldNot(gm.HaveOccurred())
+
+ g.By("Creating stream schema")
+ err = createStreamSchema(ctx, clients, groupName, streamName)
+ gm.Expect(err).ShouldNot(gm.HaveOccurred())
+
+ g.By("Verifying stream deletion")
+ err = VerifyStreamDeletion(ctx, clients, groupName, streamName)
+ gm.Expect(err).ShouldNot(gm.HaveOccurred())
+
+ _, _ = clients.GroupClient.Delete(ctx,
&databasev1.GroupRegistryServiceDeleteRequest{Group: groupName})
+ })
+
+ g.It("should delete trace correctly", func() {
+ groupName := fmt.Sprintf("del-trace-%d", time.Now().UnixNano())
+ traceName := "test_trace"
+
+ g.By("Creating trace group")
+ _, err := clients.GroupClient.Create(ctx,
&databasev1.GroupRegistryServiceCreateRequest{
+ Group: &commonv1.Group{
+ Metadata: &commonv1.Metadata{Name: groupName},
+ Catalog: commonv1.Catalog_CATALOG_TRACE,
+ ResourceOpts: &commonv1.ResourceOpts{
+ ShardNum: 2,
+ SegmentInterval:
&commonv1.IntervalRule{Unit: commonv1.IntervalRule_UNIT_DAY, Num: 1},
+ Ttl:
&commonv1.IntervalRule{Unit: commonv1.IntervalRule_UNIT_DAY, Num: 7},
+ },
+ },
+ })
+ gm.Expect(err).ShouldNot(gm.HaveOccurred())
+
+ g.By("Creating trace schema")
+ err = createTraceSchema(ctx, clients, groupName, traceName)
+ gm.Expect(err).ShouldNot(gm.HaveOccurred())
+
+ g.By("Verifying trace deletion")
+ err = VerifyTraceDeletion(ctx, clients, groupName, traceName)
+ gm.Expect(err).ShouldNot(gm.HaveOccurred())
+
+ _, _ = clients.GroupClient.Delete(ctx,
&databasev1.GroupRegistryServiceDeleteRequest{Group: groupName})
+ })
+})
+
+// VerifyMeasureDeletion implements the complete deletion test process for
measures.
+func VerifyMeasureDeletion(ctx context.Context, clients *Clients, groupName,
measureName string) error {
+ // Step 1: Write initial data to target measure
+ if err := writeMeasureData(ctx, clients.MeasureWriteClient, groupName,
measureName, 5); err != nil {
+ return fmt.Errorf("step 1 failed - write initial data: %w", err)
+ }
+
+ // Step 2: Delete the measure
+ deleteResp, err := clients.MeasureRegClient.Delete(ctx,
&databasev1.MeasureRegistryServiceDeleteRequest{
+ Metadata: &commonv1.Metadata{Name: measureName, Group:
groupName},
+ })
+ if err != nil {
+ return fmt.Errorf("step 2 failed - delete measure: %w", err)
+ }
+ if !deleteResp.Deleted {
+ return fmt.Errorf("step 2 failed - deletion not confirmed")
+ }
+
+ // Step 3: Verify rejection and invisibility
+ if err := verifyMeasureDeletionEffects(ctx, clients, groupName,
measureName); err != nil {
+ return fmt.Errorf("step 3 failed: %w", err)
+ }
+
+ // Step 4 & 5: Write to different measure and verify
+ secondMeasureName := measureName + "_second"
+ if err := createMeasureSchema(ctx, clients.MeasureRegClient, groupName,
secondMeasureName); err != nil {
+ return fmt.Errorf("step 4 failed - create second measure: %w",
err)
+ }
+ for i := 0; i < 20; i++ {
+ if err := writeMeasureData(ctx, clients.MeasureWriteClient,
groupName, secondMeasureName, 5); err != nil {
+ return fmt.Errorf("step 4 failed - write batch %d: %w",
i, err)
+ }
+ }
+ time.Sleep(5 * time.Second)
+ var queryErr error
+ for attempt := 0; attempt < 10; attempt++ {
+ queryErr = verifyMeasureQuery(ctx, clients.MeasureWriteClient,
groupName, secondMeasureName, 100)
+ if queryErr == nil {
+ break
+ }
+ time.Sleep(1 * time.Second)
+ }
+ if queryErr != nil {
+ return fmt.Errorf("step 5 failed - verify query: %w", queryErr)
+ }
+
+ return nil
+}
+
+// VerifyStreamDeletion implements the complete deletion test process for
streams.
+func VerifyStreamDeletion(ctx context.Context, clients *Clients, groupName,
streamName string) error {
+ if err := writeStreamData(ctx, clients.StreamWriteClient, groupName,
streamName, 5); err != nil {
+ return fmt.Errorf("step 1 failed - write initial data: %w", err)
+ }
+
+ deleteResp, err := clients.StreamRegClient.Delete(ctx,
&databasev1.StreamRegistryServiceDeleteRequest{
+ Metadata: &commonv1.Metadata{Name: streamName, Group:
groupName},
+ })
+ if err != nil {
+ return fmt.Errorf("step 2 failed - delete stream: %w", err)
+ }
+ if !deleteResp.Deleted {
+ return fmt.Errorf("step 2 failed - deletion not confirmed")
+ }
+
+ if err := verifyStreamDeletionEffects(ctx, clients, groupName,
streamName); err != nil {
+ return fmt.Errorf("step 3 failed: %w", err)
+ }
+
+ secondStreamName := streamName + "_second"
+ if err := createStreamSchema(ctx, clients, groupName,
secondStreamName); err != nil {
+ return fmt.Errorf("step 4 failed - create second stream: %w",
err)
+ }
+ for i := 0; i < 20; i++ {
+ if err := writeStreamData(ctx, clients.StreamWriteClient,
groupName, secondStreamName, 5); err != nil {
+ return fmt.Errorf("step 4 failed - write batch %d: %w",
i, err)
+ }
+ }
+ time.Sleep(5 * time.Second)
+ var queryErr error
+ for attempt := 0; attempt < 30; attempt++ {
+ queryErr = verifyStreamQuery(ctx, clients.StreamWriteClient,
groupName, secondStreamName, "svc_0", 100)
+ if queryErr == nil {
+ break
+ }
+ time.Sleep(1 * time.Second)
+ }
+ if queryErr != nil {
+ if err := verifyStreamAvailableAndWritable(ctx, clients,
groupName, secondStreamName); err != nil {
+ return fmt.Errorf("step 5 failed - verify query: %w;
fallback verification failed: %w", queryErr, err)
+ }
+ }
+
+ return nil
+}
+
+// VerifyTraceDeletion implements the complete deletion test process for
traces.
+func VerifyTraceDeletion(ctx context.Context, clients *Clients, groupName,
traceName string) error {
+ if err := writeTraceData(ctx, clients.TraceWriteClient, groupName,
traceName, 5); err != nil {
+ return fmt.Errorf("step 1 failed - write initial data: %w", err)
+ }
+
+ deleteResp, err := clients.TraceRegClient.Delete(ctx,
&databasev1.TraceRegistryServiceDeleteRequest{
+ Metadata: &commonv1.Metadata{Name: traceName, Group: groupName},
+ })
+ if err != nil {
+ return fmt.Errorf("step 2 failed - delete trace: %w", err)
+ }
+ if !deleteResp.Deleted {
+ return fmt.Errorf("step 2 failed - deletion not confirmed")
+ }
+
+ if err := verifyTraceDeletionEffects(ctx, clients, groupName,
traceName); err != nil {
+ return fmt.Errorf("step 3 failed: %w", err)
+ }
+
+ secondTraceName := traceName + "_second"
+ if err := createTraceSchema(ctx, clients, groupName, secondTraceName);
err != nil {
+ return fmt.Errorf("step 4 failed - create second trace: %w",
err)
+ }
+ for i := 0; i < 20; i++ {
+ if err := writeTraceData(ctx, clients.TraceWriteClient,
groupName, secondTraceName, 5); err != nil {
+ return fmt.Errorf("step 4 failed - write batch %d: %w",
i, err)
+ }
+ }
+ time.Sleep(5 * time.Second)
+ var queryErr error
+ for attempt := 0; attempt < 10; attempt++ {
+ queryErr = verifyTraceQuery(ctx, clients.TraceWriteClient,
groupName, secondTraceName, "trace_0", 100)
+ if queryErr == nil {
+ break
+ }
+ time.Sleep(1 * time.Second)
+ }
+ if queryErr != nil {
+ return fmt.Errorf("step 5 failed - verify query: %w", queryErr)
+ }
+
+ return nil
+}
+
+// Helper functions.
+
+func createMeasureSchema(ctx context.Context, client
databasev1.MeasureRegistryServiceClient, groupName, measureName string) error {
+ _, err := client.Create(ctx,
&databasev1.MeasureRegistryServiceCreateRequest{
+ Measure: &databasev1.Measure{
+ Metadata: &commonv1.Metadata{Name: measureName, Group:
groupName},
+ Entity: &databasev1.Entity{TagNames: []string{"id"}},
+ TagFamilies: []*databasev1.TagFamilySpec{{
+ Name: "default",
+ Tags: []*databasev1.TagSpec{{Name: "id", Type:
databasev1.TagType_TAG_TYPE_STRING}},
+ }},
+ Fields: []*databasev1.FieldSpec{{
+ Name: "value",
+ FieldType:
databasev1.FieldType_FIELD_TYPE_INT,
+ EncodingMethod:
databasev1.EncodingMethod_ENCODING_METHOD_GORILLA,
+ CompressionMethod:
databasev1.CompressionMethod_COMPRESSION_METHOD_ZSTD,
+ }},
+ },
+ })
+ time.Sleep(2 * time.Second)
+ return err
+}
+
+func createStreamSchema(ctx context.Context, clients *Clients, groupName,
streamName string) error {
+ _, err := clients.StreamRegClient.Create(ctx,
&databasev1.StreamRegistryServiceCreateRequest{
+ Stream: &databasev1.Stream{
+ Metadata: &commonv1.Metadata{Name: streamName, Group:
groupName},
+ Entity: &databasev1.Entity{TagNames: []string{"svc"}},
+ TagFamilies: []*databasev1.TagFamilySpec{{
+ Name: "default",
+ Tags: []*databasev1.TagSpec{{Name: "svc", Type:
databasev1.TagType_TAG_TYPE_STRING}},
+ }},
+ },
+ })
+ if err != nil {
+ return err
+ }
+ indexRuleName := streamName + "_svc_idx"
+ _, err = clients.IndexRuleClient.Create(ctx,
&databasev1.IndexRuleRegistryServiceCreateRequest{
+ IndexRule: &databasev1.IndexRule{
+ Metadata: &commonv1.Metadata{Name: indexRuleName,
Group: groupName},
+ Tags: []string{"svc"},
+ Type: databasev1.IndexRule_TYPE_INVERTED,
+ },
+ })
+ if err != nil {
+ return err
+ }
+ _, err = clients.IndexRuleBindingClient.Create(ctx,
&databasev1.IndexRuleBindingRegistryServiceCreateRequest{
+ IndexRuleBinding: &databasev1.IndexRuleBinding{
+ Metadata: &commonv1.Metadata{Name: streamName +
"_binding", Group: groupName},
+ Rules: []string{indexRuleName},
+ Subject: &databasev1.Subject{
+ Catalog: commonv1.Catalog_CATALOG_STREAM,
+ Name: streamName,
+ },
+ BeginAt: timestamppb.New(time.Date(2021, 1, 1, 0, 0,
0, 0, time.UTC)),
+ ExpireAt: timestamppb.New(time.Date(2121, 1, 1, 0, 0,
0, 0, time.UTC)),
+ },
+ })
+ if err != nil {
+ return err
+ }
+ time.Sleep(2 * time.Second)
+ return nil
+}
+
+func createTraceSchema(ctx context.Context, clients *Clients, groupName,
traceName string) error {
+ _, err := clients.TraceRegClient.Create(ctx,
&databasev1.TraceRegistryServiceCreateRequest{
+ Trace: &databasev1.Trace{
+ Metadata: &commonv1.Metadata{Name: traceName, Group:
groupName},
+ Tags: []*databasev1.TraceTagSpec{
+ {Name: "trace_id", Type:
databasev1.TagType_TAG_TYPE_STRING},
+ {Name: "span_id", Type:
databasev1.TagType_TAG_TYPE_STRING},
+ {Name: "timestamp", Type:
databasev1.TagType_TAG_TYPE_TIMESTAMP},
+ {Name: "service_id", Type:
databasev1.TagType_TAG_TYPE_STRING},
+ {Name: "duration", Type:
databasev1.TagType_TAG_TYPE_INT},
+ },
+ TraceIdTagName: "trace_id",
+ SpanIdTagName: "span_id",
+ TimestampTagName: "timestamp",
+ },
+ })
+ if err != nil {
+ return err
+ }
+ indexRuleName := traceName + "_trace_id_idx"
+ _, err = clients.IndexRuleClient.Create(ctx,
&databasev1.IndexRuleRegistryServiceCreateRequest{
+ IndexRule: &databasev1.IndexRule{
+ Metadata: &commonv1.Metadata{Name: indexRuleName,
Group: groupName},
+ Tags: []string{"trace_id"},
+ Type: databasev1.IndexRule_TYPE_INVERTED,
+ },
+ })
+ if err != nil {
+ return err
+ }
+ _, err = clients.IndexRuleBindingClient.Create(ctx,
&databasev1.IndexRuleBindingRegistryServiceCreateRequest{
+ IndexRuleBinding: &databasev1.IndexRuleBinding{
+ Metadata: &commonv1.Metadata{Name: traceName +
"_binding", Group: groupName},
+ Rules: []string{indexRuleName},
+ Subject: &databasev1.Subject{
+ Catalog: commonv1.Catalog_CATALOG_TRACE,
+ Name: traceName,
+ },
+ BeginAt: timestamppb.New(time.Date(2021, 1, 1, 0, 0,
0, 0, time.UTC)),
+ ExpireAt: timestamppb.New(time.Date(2121, 1, 1, 0, 0,
0, 0, time.UTC)),
+ },
+ })
+ if err != nil {
+ return err
+ }
+ time.Sleep(2 * time.Second)
+ return nil
+}
+
+func writeMeasureData(ctx context.Context, client
measurev1.MeasureServiceClient, groupName, measureName string, count int) error
{
+ writeClient, err := client.Write(ctx)
+ if err != nil {
+ return err
+ }
+ metadata := &commonv1.Metadata{Name: measureName, Group: groupName}
+ baseTime := time.Now().Truncate(time.Millisecond)
+ for idx := 0; idx < count; idx++ {
+ if err := writeClient.Send(&measurev1.WriteRequest{
+ Metadata: metadata,
+ DataPoint: &measurev1.DataPointValue{
+ Timestamp:
timestamppb.New(baseTime.Add(time.Duration(idx) * time.Second)),
+ TagFamilies: []*modelv1.TagFamilyForWrite{{
+ Tags: []*modelv1.TagValue{{
+ Value:
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "id_" + strconv.Itoa(idx)}},
+ }},
+ }},
+ Fields: []*modelv1.FieldValue{{
+ Value: &modelv1.FieldValue_Int{Int:
&modelv1.Int{Value: int64(idx * 100)}},
+ }},
+ },
+ MessageId: uint64(time.Now().UnixNano() + int64(idx)),
+ }); err != nil {
+ return err
+ }
+ }
+ if err := writeClient.CloseSend(); err != nil {
+ return err
+ }
+ for {
+ resp, recvErr := writeClient.Recv()
+ if errors.Is(recvErr, io.EOF) {
+ break
+ }
+ if recvErr != nil {
+ return recvErr
+ }
+ if resp != nil && resp.Status !=
modelv1.Status_STATUS_SUCCEED.String() {
+ return fmt.Errorf("write failed with status: %s",
resp.Status)
+ }
+ }
+ return nil
+}
+
+func writeStreamData(ctx context.Context, client streamv1.StreamServiceClient,
groupName, streamName string, count int) error {
+ writeClient, err := client.Write(ctx)
+ if err != nil {
+ return err
+ }
+ metadata := &commonv1.Metadata{Name: streamName, Group: groupName}
+ baseTime := time.Now().Truncate(time.Millisecond)
+ for idx := 0; idx < count; idx++ {
+ if err := writeClient.Send(&streamv1.WriteRequest{
+ Metadata: metadata,
+ Element: &streamv1.ElementValue{
+ ElementId:
strconv.Itoa(int(time.Now().UnixNano()) + idx),
+ Timestamp:
timestamppb.New(baseTime.Add(time.Duration(idx) * time.Second)),
+ TagFamilies: []*modelv1.TagFamilyForWrite{{
+ Tags: []*modelv1.TagValue{{
+ Value:
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "svc_" + strconv.Itoa(idx)}},
+ }},
+ }},
+ },
+ MessageId: uint64(time.Now().UnixNano() + int64(idx)),
+ }); err != nil {
+ return err
+ }
+ }
+ if err := writeClient.CloseSend(); err != nil {
+ return err
+ }
+ for {
+ resp, recvErr := writeClient.Recv()
+ if errors.Is(recvErr, io.EOF) {
+ break
+ }
+ if recvErr != nil {
+ return recvErr
+ }
+ if resp != nil && resp.Status !=
modelv1.Status_STATUS_SUCCEED.String() {
+ return fmt.Errorf("write failed with status: %s",
resp.Status)
+ }
+ }
+ return nil
+}
+
+func writeTraceData(ctx context.Context, client tracev1.TraceServiceClient,
groupName, traceName string, count int) error {
+ writeClient, err := client.Write(ctx)
+ if err != nil {
+ return err
+ }
+ metadata := &commonv1.Metadata{Name: traceName, Group: groupName}
+ baseTime := time.Now().Truncate(time.Millisecond)
+ for idx := 0; idx < count; idx++ {
+ if err := writeClient.Send(&tracev1.WriteRequest{
+ Metadata: metadata,
+ Tags: []*modelv1.TagValue{
+ {Value: &modelv1.TagValue_Str{Str:
&modelv1.Str{Value: fmt.Sprintf("trace_%d", idx)}}},
+ {Value: &modelv1.TagValue_Str{Str:
&modelv1.Str{Value: fmt.Sprintf("span_%d_%d", idx, time.Now().UnixNano())}}},
+ {Value: &modelv1.TagValue_Timestamp{Timestamp:
timestamppb.New(baseTime.Add(time.Duration(idx) * time.Second))}},
+ {Value: &modelv1.TagValue_Str{Str:
&modelv1.Str{Value: "test_service"}}},
+ {Value: &modelv1.TagValue_Int{Int:
&modelv1.Int{Value: int64(idx * 10)}}},
+ },
+ Span: []byte(fmt.Sprintf("span_data_%d", idx)),
+ Version: uint64(idx + 1),
+ }); err != nil {
+ return err
+ }
+ }
+ if err := writeClient.CloseSend(); err != nil {
+ return err
+ }
+ for {
+ resp, recvErr := writeClient.Recv()
+ if errors.Is(recvErr, io.EOF) {
+ break
+ }
+ if recvErr != nil {
+ return recvErr
+ }
+ if resp != nil && resp.Status !=
modelv1.Status_STATUS_SUCCEED.String() {
+ return fmt.Errorf("write failed with status: %s",
resp.Status)
+ }
+ }
+ return nil
+}
+
+func verifyMeasureDeletionEffects(ctx context.Context, clients *Clients,
groupName, measureName string) error {
+ metadata := &commonv1.Metadata{Name: measureName, Group: groupName}
+
+ _, getErr := clients.MeasureRegClient.Get(ctx,
&databasev1.MeasureRegistryServiceGetRequest{Metadata: metadata})
+ if getErr == nil {
+ return fmt.Errorf("get should return error for deleted measure")
+ }
+ st, ok := status.FromError(getErr)
+ if !ok || st.Code() != codes.NotFound {
+ return fmt.Errorf("get should return NotFound, got: %v",
st.Code())
+ }
+
+ existResp, existErr := clients.MeasureRegClient.Exist(ctx,
&databasev1.MeasureRegistryServiceExistRequest{Metadata: metadata})
+ if existErr != nil {
+ return fmt.Errorf("exist call failed: %w", existErr)
+ }
+ if existResp.HasMeasure {
+ return fmt.Errorf("exist should return false for deleted
measure")
+ }
+
+ listResp, listErr := clients.MeasureRegClient.List(ctx,
&databasev1.MeasureRegistryServiceListRequest{Group: groupName})
+ if listErr != nil {
+ return fmt.Errorf("list call failed: %w", listErr)
+ }
+ for _, m := range listResp.Measure {
+ if m.Metadata.Name == measureName {
+ return fmt.Errorf("deleted measure should not appear in
list")
+ }
+ }
+
+ if err := writeMeasureData(ctx, clients.MeasureWriteClient, groupName,
measureName, 1); err == nil {
+ return fmt.Errorf("write to deleted measure should fail")
+ }
+
+ return nil
+}
+
+func verifyStreamDeletionEffects(ctx context.Context, clients *Clients,
groupName, streamName string) error {
+ metadata := &commonv1.Metadata{Name: streamName, Group: groupName}
+
+ _, getErr := clients.StreamRegClient.Get(ctx,
&databasev1.StreamRegistryServiceGetRequest{Metadata: metadata})
+ if getErr == nil {
+ return fmt.Errorf("get should return error for deleted stream")
+ }
+ st, ok := status.FromError(getErr)
+ if !ok || st.Code() != codes.NotFound {
+ return fmt.Errorf("get should return NotFound, got: %v",
st.Code())
+ }
+
+ existResp, existErr := clients.StreamRegClient.Exist(ctx,
&databasev1.StreamRegistryServiceExistRequest{Metadata: metadata})
+ if existErr != nil {
+ return fmt.Errorf("exist call failed: %w", existErr)
+ }
+ if existResp.HasStream {
+ return fmt.Errorf("exist should return false for deleted
stream")
+ }
+
+ listResp, listErr := clients.StreamRegClient.List(ctx,
&databasev1.StreamRegistryServiceListRequest{Group: groupName})
+ if listErr != nil {
+ return fmt.Errorf("list call failed: %w", listErr)
+ }
+ for _, s := range listResp.Stream {
+ if s.Metadata.Name == streamName {
+ return fmt.Errorf("deleted stream should not appear in
list")
+ }
+ }
+
+ if err := writeStreamData(ctx, clients.StreamWriteClient, groupName,
streamName, 1); err == nil {
+ return fmt.Errorf("write to deleted stream should fail")
+ }
+
+ return nil
+}
+
+func verifyStreamAvailableAndWritable(ctx context.Context, clients *Clients,
groupName, streamName string) error {
+ metadata := &commonv1.Metadata{Name: streamName, Group: groupName}
+
+ if _, err := clients.StreamRegClient.Get(ctx,
&databasev1.StreamRegistryServiceGetRequest{Metadata: metadata}); err != nil {
+ return fmt.Errorf("get stream failed: %w", err)
+ }
+
+ existResp, err := clients.StreamRegClient.Exist(ctx,
&databasev1.StreamRegistryServiceExistRequest{Metadata: metadata})
+ if err != nil {
+ return fmt.Errorf("exist stream failed: %w", err)
+ }
+ if !existResp.HasStream {
+ return fmt.Errorf("stream should exist")
+ }
+
+ if err := writeStreamData(ctx, clients.StreamWriteClient, groupName,
streamName, 1); err != nil {
+ return fmt.Errorf("write to active stream failed: %w", err)
+ }
+
+ return nil
+}
+
+func verifyTraceDeletionEffects(ctx context.Context, clients *Clients,
groupName, traceName string) error {
+ metadata := &commonv1.Metadata{Name: traceName, Group: groupName}
+
+ _, getErr := clients.TraceRegClient.Get(ctx,
&databasev1.TraceRegistryServiceGetRequest{Metadata: metadata})
+ if getErr == nil {
+ return fmt.Errorf("get should return error for deleted trace")
+ }
+ st, ok := status.FromError(getErr)
+ if !ok || st.Code() != codes.NotFound {
+ return fmt.Errorf("get should return NotFound, got: %v",
st.Code())
+ }
+
+ existResp, existErr := clients.TraceRegClient.Exist(ctx,
&databasev1.TraceRegistryServiceExistRequest{Metadata: metadata})
+ if existErr != nil {
+ return fmt.Errorf("exist call failed: %w", existErr)
+ }
+ if existResp.HasTrace {
+ return fmt.Errorf("exist should return false for deleted trace")
+ }
+
+ listResp, listErr := clients.TraceRegClient.List(ctx,
&databasev1.TraceRegistryServiceListRequest{Group: groupName})
+ if listErr != nil {
+ return fmt.Errorf("list call failed: %w", listErr)
+ }
+ for _, t := range listResp.Trace {
+ if t.Metadata.Name == traceName {
+ return fmt.Errorf("deleted trace should not appear in
list")
+ }
+ }
+
+ if err := writeTraceData(ctx, clients.TraceWriteClient, groupName,
traceName, 1); err == nil {
+ return fmt.Errorf("write to deleted trace should fail")
+ }
+
+ return nil
+}
+
+func verifyMeasureQuery(ctx context.Context, client
measurev1.MeasureServiceClient, groupName, measureName string, _ int) error {
+ now := time.Now().Truncate(time.Millisecond)
+ resp, err := client.Query(ctx, &measurev1.QueryRequest{
+ Groups: []string{groupName},
+ Name: measureName,
+ TimeRange: &modelv1.TimeRange{
+ Begin: timestamppb.New(now.Add(-1 * time.Hour)),
+ End: timestamppb.New(now.Add(1 * time.Hour)),
+ },
+ TagProjection: &modelv1.TagProjection{
+ TagFamilies: []*modelv1.TagProjection_TagFamily{
+ {Name: "default", Tags: []string{"id"}},
+ },
+ },
+ FieldProjection: &measurev1.QueryRequest_FieldProjection{
+ Names: []string{"value"},
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("measure query failed: %w", err)
+ }
+ if len(resp.DataPoints) == 0 {
+ return fmt.Errorf("expected measure data points but got none")
+ }
+ return nil
+}
+
+func verifyStreamQuery(ctx context.Context, client
streamv1.StreamServiceClient, groupName, streamName, serviceID string, _ int)
error {
+ now := time.Now().Truncate(time.Millisecond)
+ queryReq := &streamv1.QueryRequest{
+ Groups: []string{groupName},
+ Name: streamName,
+ TimeRange: &modelv1.TimeRange{
+ Begin: timestamppb.New(now.Add(-1 * time.Hour)),
+ End: timestamppb.New(now.Add(1 * time.Hour)),
+ },
+ Projection: &modelv1.TagProjection{
+ TagFamilies: []*modelv1.TagProjection_TagFamily{
+ {Name: "default", Tags: []string{"svc"}},
+ },
+ },
+ }
+ if serviceID != "" {
+ queryReq.Criteria = &modelv1.Criteria{Exp:
&modelv1.Criteria_Condition{Condition: &modelv1.Condition{
+ Name: "svc",
+ Op: modelv1.Condition_BINARY_OP_EQ,
+ Value: &modelv1.TagValue{Value: &modelv1.TagValue_Str{
+ Str: &modelv1.Str{Value: serviceID},
+ }},
+ }}}
+ }
+ resp, err := client.Query(ctx, queryReq)
+ if err != nil {
+ return fmt.Errorf("stream query failed: %w", err)
+ }
+ if len(resp.Elements) == 0 {
+ return fmt.Errorf("expected stream elements but got none")
+ }
+ return nil
+}
+
+func verifyTraceQuery(ctx context.Context, client tracev1.TraceServiceClient,
groupName, traceName, traceID string, _ int) error {
+ now := time.Now().Truncate(time.Millisecond)
+ queryReq := &tracev1.QueryRequest{
+ Groups: []string{groupName},
+ Name: traceName,
+ TimeRange: &modelv1.TimeRange{
+ Begin: timestamppb.New(now.Add(-1 * time.Hour)),
+ End: timestamppb.New(now.Add(1 * time.Hour)),
+ },
+ }
+ if traceID != "" {
+ queryReq.Criteria = &modelv1.Criteria{Exp:
&modelv1.Criteria_Condition{Condition: &modelv1.Condition{
+ Name: "trace_id",
+ Op: modelv1.Condition_BINARY_OP_EQ,
+ Value: &modelv1.TagValue{Value: &modelv1.TagValue_Str{
+ Str: &modelv1.Str{Value: traceID},
+ }},
+ }}}
+ queryReq.TagProjection = []string{"trace_id"}
+ }
+ resp, err := client.Query(ctx, queryReq)
+ if err != nil {
+ return fmt.Errorf("trace query failed: %w", err)
+ }
+ if len(resp.Traces) == 0 {
+ return fmt.Errorf("expected trace data but got none")
+ }
+ return nil
+}
diff --git a/test/integration/distributed/schema/schema_suite_test.go
b/test/integration/distributed/schema/schema_suite_test.go
new file mode 100644
index 000000000..51b79bd28
--- /dev/null
+++ b/test/integration/distributed/schema/schema_suite_test.go
@@ -0,0 +1,130 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package integration_schema_test
+
+import (
+ "context"
+ "fmt"
+ "testing"
+ "time"
+
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+ "github.com/onsi/gomega/gleak"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+
+ "github.com/apache/skywalking-banyandb/banyand/metadata"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/pool"
+ "github.com/apache/skywalking-banyandb/pkg/test"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
+ "github.com/apache/skywalking-banyandb/pkg/test/gmatcher"
+ "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+ test_measure "github.com/apache/skywalking-banyandb/pkg/test/measure"
+ "github.com/apache/skywalking-banyandb/pkg/test/setup"
+ test_stream "github.com/apache/skywalking-banyandb/pkg/test/stream"
+ test_trace "github.com/apache/skywalking-banyandb/pkg/test/trace"
+ casesschema "github.com/apache/skywalking-banyandb/test/cases/schema"
+)
+
+func TestSchemaDeletion(t *testing.T) {
+ RegisterFailHandler(Fail)
+ RunSpecs(t, "Distributed Schema Deletion Suite")
+}
+
+var (
+ deferFunc func()
+ goods []gleak.Goroutine
+ connection *grpc.ClientConn
+)
+
+var _ = SynchronizedBeforeSuite(func() []byte {
+ Expect(logger.Init(logger.Logging{
+ Env: "dev",
+ Level: flags.LogLevel,
+ })).To(Succeed())
+ pool.EnableStackTracking(true)
+ goods = gleak.Goroutines()
+ By("Starting etcd server")
+ ports, err := test.AllocateFreePorts(2)
+ Expect(err).NotTo(HaveOccurred())
+ dir, spaceDef, err := test.NewSpace()
+ Expect(err).NotTo(HaveOccurred())
+ ep := fmt.Sprintf("http://127.0.0.1:%d", ports[0])
+ server, err := embeddedetcd.NewServer(
+ embeddedetcd.ConfigureListener([]string{ep},
[]string{fmt.Sprintf("http://127.0.0.1:%d", ports[1])}),
+ embeddedetcd.RootDir(dir),
+ )
+ Expect(err).ShouldNot(HaveOccurred())
+ <-server.ReadyNotify()
+ By("Loading schema")
+ schemaRegistry, err := schema.NewEtcdSchemaRegistry(
+ schema.Namespace(metadata.DefaultNamespace),
+ schema.ConfigureServerEndpoints([]string{ep}),
+ )
+ Expect(err).NotTo(HaveOccurred())
+ defer schemaRegistry.Close()
+ ctx := context.Background()
+ test_stream.PreloadSchema(ctx, schemaRegistry)
+ test_measure.PreloadSchema(ctx, schemaRegistry)
+ test_trace.PreloadSchema(ctx, schemaRegistry)
+ By("Starting data node 0")
+ closeDataNode0 := setup.DataNode(ep)
+ By("Starting data node 1")
+ closeDataNode1 := setup.DataNode(ep)
+ By("Starting liaison node")
+ liaisonAddr, closerLiaisonNode := setup.LiaisonNode(ep)
+ deferFunc = func() {
+ closerLiaisonNode()
+ closeDataNode0()
+ closeDataNode1()
+ _ = server.Close()
+ <-server.StopNotify()
+ spaceDef()
+ }
+ return []byte(liaisonAddr)
+}, func(address []byte) {
+ var err error
+ connection, err = grpchelper.Conn(string(address), 10*time.Second,
+ grpc.WithTransportCredentials(insecure.NewCredentials()))
+ casesschema.SharedContext = helpers.SharedContext{
+ Connection: connection,
+ BaseTime: time.Now(),
+ }
+ Expect(err).NotTo(HaveOccurred())
+})
+
+var _ = SynchronizedAfterSuite(func() {
+ if connection != nil {
+ Expect(connection.Close()).To(Succeed())
+ }
+}, func() {})
+
+var _ = ReportAfterSuite("Distributed Schema Deletion Suite", func(report
Report) {
+ if report.SuiteSucceeded {
+ if deferFunc != nil {
+ deferFunc()
+ }
+ Eventually(gleak.Goroutines,
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+ Eventually(pool.AllRefsCount,
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
+ }
+})
diff --git a/test/integration/standalone/schema/schema_suite_test.go
b/test/integration/standalone/schema/schema_suite_test.go
new file mode 100644
index 000000000..f41655901
--- /dev/null
+++ b/test/integration/standalone/schema/schema_suite_test.go
@@ -0,0 +1,87 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package integration_schema_test
+
+import (
+ "testing"
+ "time"
+
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+ "github.com/onsi/gomega/gleak"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+
+ "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/pool"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
+ "github.com/apache/skywalking-banyandb/pkg/test/gmatcher"
+ "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+ "github.com/apache/skywalking-banyandb/pkg/test/setup"
+ casesschema "github.com/apache/skywalking-banyandb/test/cases/schema"
+ integration_standalone
"github.com/apache/skywalking-banyandb/test/integration/standalone"
+)
+
+func TestSchemaDeletion(t *testing.T) {
+ RegisterFailHandler(Fail)
+ RunSpecs(t, "Standalone Schema Deletion Suite",
Label(integration_standalone.Labels...))
+}
+
+var (
+ connection *grpc.ClientConn
+ deferFunc func()
+ goods []gleak.Goroutine
+)
+
+var _ = SynchronizedBeforeSuite(func() []byte {
+ goods = gleak.Goroutines()
+ pool.EnableStackTracking(true)
+ Expect(logger.Init(logger.Logging{
+ Env: "dev",
+ Level: flags.LogLevel,
+ })).To(Succeed())
+ addr, _, closeFn := setup.EmptyStandalone()
+ deferFunc = closeFn
+ return []byte(addr)
+}, func(address []byte) {
+ var err error
+ connection, err = grpchelper.Conn(string(address), 10*time.Second,
+ grpc.WithTransportCredentials(insecure.NewCredentials()))
+ casesschema.SharedContext = helpers.SharedContext{
+ Connection: connection,
+ BaseTime: time.Now(),
+ }
+ Expect(err).NotTo(HaveOccurred())
+})
+
+var _ = SynchronizedAfterSuite(func() {
+ if connection != nil {
+ Expect(connection.Close()).To(Succeed())
+ }
+}, func() {})
+
+var _ = ReportAfterSuite("Standalone Schema Deletion Suite", func(report
Report) {
+ if report.SuiteSucceeded {
+ if deferFunc != nil {
+ deferFunc()
+ }
+ Eventually(gleak.Goroutines,
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+ Eventually(pool.AllRefsCount,
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
+ }
+})