This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 722a2565b Implement entire group deletion (#1005)
722a2565b is described below
commit 722a2565bd3a5ee9a4707a08fe8f250f47c36120
Author: Huang Youliang <[email protected]>
AuthorDate: Fri Mar 13 19:32:43 2026 +0800
Implement entire group deletion (#1005)
* Implement entire group deletion
---------
Co-authored-by: Gao Hongtao <[email protected]>
---
api/data/data.go | 21 +
api/data/measure.go | 3 +
api/data/stream.go | 3 +
api/data/trace.go | 3 +
api/proto/banyandb/database/v1/rpc.proto | 11 +-
banyand/internal/storage/storage.go | 2 +
banyand/internal/storage/tsdb.go | 14 +
banyand/internal/wqueue/wqueue.go | 14 +
banyand/liaison/grpc/deletion.go | 465 +++++++++++++++++++++
banyand/liaison/grpc/deletion_test.go | 275 ++++++++++++
banyand/liaison/grpc/discovery.go | 63 ++-
banyand/liaison/grpc/discovery_test.go | 114 +++++
banyand/liaison/grpc/measure.go | 28 ++
banyand/liaison/grpc/property.go | 25 +-
banyand/liaison/grpc/registry.go | 66 ++-
banyand/liaison/grpc/server.go | 11 +-
banyand/liaison/grpc/stream.go | 19 +
banyand/liaison/grpc/trace.go | 19 +
banyand/measure/svc_data.go | 23 +
banyand/measure/svc_liaison.go | 24 ++
banyand/measure/svc_standalone.go | 5 +
banyand/metadata/client.go | 8 +
banyand/metadata/metadata.go | 2 +
banyand/metadata/schema/collector.go | 82 ++++
banyand/property/db/db.go | 28 ++
banyand/property/service.go | 37 +-
banyand/stream/svc_liaison.go | 24 ++
banyand/stream/svc_standalone.go | 24 ++
banyand/trace/handoff_controller.go | 46 ++
banyand/trace/metadata.go | 17 +-
banyand/trace/svc_liaison.go | 24 ++
banyand/trace/svc_standalone.go | 25 ++
bydbctl/internal/cmd/group_test.go | 15 +-
docs/api-reference.md | 5 +-
pkg/index/inverted/inverted_series.go | 6 +
pkg/schema/cache.go | 23 +-
pkg/schema/schema.go | 2 +
pkg/test/setup/setup.go | 37 +-
.../distributed/deletion/deletion_suite_test.go | 313 ++++++++++++++
.../distributed/{inspect => inspection}/common.go | 4 +-
.../{inspect => inspection}/etcd/suite_test.go | 6 +-
.../{inspect => inspection}/property/suite_test.go | 6 +-
.../standalone/deletion/deletion_suite_test.go | 225 ++++++++++
.../standalone/{inspect => inspection}/common.go | 4 +-
.../{inspect => inspection}/etcd/suite_test.go | 6 +-
.../{inspect => inspection}/property/suite_test.go | 6 +-
46 files changed, 2118 insertions(+), 65 deletions(-)
diff --git a/api/data/data.go b/api/data/data.go
index 9857bac49..b8fb4ef68 100644
--- a/api/data/data.go
+++ b/api/data/data.go
@@ -62,6 +62,9 @@ var (
TopicStreamCollectLiaisonInfo.String():
TopicStreamCollectLiaisonInfo,
TopicTraceCollectDataInfo.String():
TopicTraceCollectDataInfo,
TopicTraceCollectLiaisonInfo.String():
TopicTraceCollectLiaisonInfo,
+ TopicMeasureDropGroup.String(): TopicMeasureDropGroup,
+ TopicStreamDropGroup.String(): TopicStreamDropGroup,
+ TopicTraceDropGroup.String(): TopicTraceDropGroup,
}
// TopicRequestMap is the map of topic name to request message.
@@ -157,6 +160,15 @@ var (
TopicTraceCollectLiaisonInfo: func() proto.Message {
return &databasev1.GroupRegistryServiceInspectRequest{}
},
+ TopicMeasureDropGroup: func() proto.Message {
+ return &databasev1.GroupRegistryServiceDeleteRequest{}
+ },
+ TopicStreamDropGroup: func() proto.Message {
+ return &databasev1.GroupRegistryServiceDeleteRequest{}
+ },
+ TopicTraceDropGroup: func() proto.Message {
+ return &databasev1.GroupRegistryServiceDeleteRequest{}
+ },
}
// TopicResponseMap is the map of topic name to response message.
@@ -207,6 +219,15 @@ var (
TopicTraceCollectLiaisonInfo: func() proto.Message {
return &databasev1.LiaisonInfo{}
},
+ TopicMeasureDropGroup: func() proto.Message {
+ return &databasev1.GroupRegistryServiceDeleteRequest{}
+ },
+ TopicStreamDropGroup: func() proto.Message {
+ return &databasev1.GroupRegistryServiceDeleteRequest{}
+ },
+ TopicTraceDropGroup: func() proto.Message {
+ return &databasev1.GroupRegistryServiceDeleteRequest{}
+ },
}
// TopicCommon is the common topic for data transmission.
diff --git a/api/data/measure.go b/api/data/measure.go
index d414abeb9..a6bdcbdc8 100644
--- a/api/data/measure.go
+++ b/api/data/measure.go
@@ -109,3 +109,6 @@ var TopicMeasureCollectDataInfo =
bus.BiTopic("measure-collect-data-info")
// TopicMeasureCollectLiaisonInfo is the topic for collecting liaison info
from liaison nodes.
var TopicMeasureCollectLiaisonInfo =
bus.BiTopic("measure-collect-liaison-info")
+
+// TopicMeasureDropGroup is the topic for dropping group data files.
+var TopicMeasureDropGroup = bus.BiTopic("measure-drop-group")
diff --git a/api/data/stream.go b/api/data/stream.go
index 560931a39..2130f328e 100644
--- a/api/data/stream.go
+++ b/api/data/stream.go
@@ -99,3 +99,6 @@ var TopicStreamCollectDataInfo =
bus.BiTopic("stream-collect-data-info")
// TopicStreamCollectLiaisonInfo is the topic for collecting liaison info from
liaison nodes.
var TopicStreamCollectLiaisonInfo = bus.BiTopic("stream-collect-liaison-info")
+
+// TopicStreamDropGroup is the topic for dropping group data files.
+var TopicStreamDropGroup = bus.BiTopic("stream-drop-group")
diff --git a/api/data/trace.go b/api/data/trace.go
index 0c7d55795..7bf8edf5a 100644
--- a/api/data/trace.go
+++ b/api/data/trace.go
@@ -81,3 +81,6 @@ var TopicTraceCollectDataInfo =
bus.BiTopic("trace-collect-data-info")
// TopicTraceCollectLiaisonInfo is the topic for collecting liaison info from
liaison nodes.
var TopicTraceCollectLiaisonInfo = bus.BiTopic("trace-collect-liaison-info")
+
+// TopicTraceDropGroup is the topic for dropping group data files.
+var TopicTraceDropGroup = bus.BiTopic("trace-drop-group")
diff --git a/api/proto/banyandb/database/v1/rpc.proto
b/api/proto/banyandb/database/v1/rpc.proto
index c0893c065..d9a1cda92 100644
--- a/api/proto/banyandb/database/v1/rpc.proto
+++ b/api/proto/banyandb/database/v1/rpc.proto
@@ -364,14 +364,15 @@ message GroupRegistryServiceDeleteRequest {
// force indicates whether to force delete the group even if it contains
data.
// When false, deletion will fail if the group is not empty.
bool force = 3;
+ // data_only indicates whether to delete only data files without removing
metadata.
+ // When true, metadata are preserved.
+ bool data_only = 4;
}
// GroupRegistryServiceDeleteResponse is the response for deleting a group.
message GroupRegistryServiceDeleteResponse {
- // deleted indicates whether the group was deleted.
- bool deleted = 1;
- // task_id is the ID of the background deletion task.
- string task_id = 2;
+ // schema_info contains the schema resources that would be deleted
(populated in dry-run mode).
+ SchemaInfo schema_info = 1;
}
// GroupDeletionTask represents the status of a group deletion operation.
@@ -402,6 +403,8 @@ message GroupDeletionTask {
string message = 6;
// created_at is the timestamp when the task was created.
google.protobuf.Timestamp created_at = 7;
+ // updated_at is the timestamp when the task was last updated.
+ google.protobuf.Timestamp updated_at = 8;
}
// GroupRegistryServiceQueryRequest is the request for querying a group
deletion task.
diff --git a/banyand/internal/storage/storage.go
b/banyand/internal/storage/storage.go
index 7ff809a95..3b30bef84 100644
--- a/banyand/internal/storage/storage.go
+++ b/banyand/internal/storage/storage.go
@@ -121,6 +121,8 @@ type TSDB[T TSTable, O any] interface {
// DeleteOldestSegment deletes exactly one oldest segment if it exists
and meets safety rules.
// Returns true if a segment was deleted, false otherwise.
DeleteOldestSegment() (bool, error)
+ // Drop closes the database and removes all data files from disk.
+ Drop() error
}
// Segment is a time range of data.
diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go
index 244774a0d..6cc5c5369 100644
--- a/banyand/internal/storage/tsdb.go
+++ b/banyand/internal/storage/tsdb.go
@@ -175,6 +175,20 @@ func (d *database[T, O]) Close() error {
return nil
}
+// Drop closes the database and removes all data files from disk.
+func (d *database[T, O]) Drop() (err error) {
+ if closeErr := d.Close(); closeErr != nil {
+ return closeErr
+ }
+ defer func() {
+ if r := recover(); r != nil {
+ err = errors.Errorf("failed to remove database
directory %s: %v", d.location, r)
+ }
+ }()
+ d.lfs.MustRMAll(d.location)
+ return nil
+}
+
// OpenTSDB returns a new tsdb runtime. This constructor will create a new
database if it's absent,
// or load an existing one.
func OpenTSDB[T TSTable, O any](ctx context.Context, opts TSDBOpts[T, O],
cache Cache, group string) (TSDB[T, O], error) {
diff --git a/banyand/internal/wqueue/wqueue.go
b/banyand/internal/wqueue/wqueue.go
index 357d5a952..b24c56059 100644
--- a/banyand/internal/wqueue/wqueue.go
+++ b/banyand/internal/wqueue/wqueue.go
@@ -125,6 +125,20 @@ func (q *Queue[S, O]) Close() error {
return nil
}
+// Drop closes the queue and removes all data files from disk.
+func (q *Queue[S, O]) Drop() (err error) {
+ if closeErr := q.Close(); closeErr != nil {
+ return closeErr
+ }
+ defer func() {
+ if r := recover(); r != nil {
+ err = fmt.Errorf("failed to remove queue data at %s:
%v", q.location, r)
+ }
+ }()
+ q.lfs.MustRMAll(q.location)
+ return nil
+}
+
// Open creates and initializes a new queue with the given options.
func Open[S SubQueue, O any](ctx context.Context, opts Opts[S, O], _ string)
(*Queue[S, O], error) {
p := common.GetPosition(ctx)
diff --git a/banyand/liaison/grpc/deletion.go b/banyand/liaison/grpc/deletion.go
new file mode 100644
index 000000000..60edfd033
--- /dev/null
+++ b/banyand/liaison/grpc/deletion.go
@@ -0,0 +1,465 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "sync"
+ "time"
+
+ "google.golang.org/protobuf/proto"
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ propertyv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+ "github.com/apache/skywalking-banyandb/banyand/metadata"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+const (
+ internalDeletionTaskGroup = "_deletion_task"
+ internalDeletionTaskPropertyName = "_deletion_task"
+ taskDataTagName = "task_data"
+ taskStaleTimeout = 10 * time.Minute
+)
+
+type propertyApplier interface {
+ Apply(ctx context.Context, req *propertyv1.ApplyRequest)
(*propertyv1.ApplyResponse, error)
+ Query(ctx context.Context, req *propertyv1.QueryRequest)
(*propertyv1.QueryResponse, error)
+}
+
+type groupDeletionTaskManager struct {
+ schemaRegistry metadata.Repo
+ propServer propertyApplier
+ log *logger.Logger
+ groupRepo *groupRepo
+ tasks sync.Map
+}
+
+func newGroupDeletionTaskManager(
+ schemaRegistry metadata.Repo, propServer *propertyServer, gr
*groupRepo, l *logger.Logger,
+) *groupDeletionTaskManager {
+ return &groupDeletionTaskManager{
+ schemaRegistry: schemaRegistry,
+ propServer: propServer,
+ groupRepo: gr,
+ log: l,
+ }
+}
+
+func (m *groupDeletionTaskManager) initPropertyStorage(ctx context.Context)
error {
+ group := &commonv1.Group{
+ Metadata: &commonv1.Metadata{
+ Name: internalDeletionTaskGroup,
+ },
+ Catalog: commonv1.Catalog_CATALOG_PROPERTY,
+ ResourceOpts: &commonv1.ResourceOpts{
+ ShardNum: 1,
+ },
+ }
+ _, getGroupErr := m.schemaRegistry.GroupRegistry().GetGroup(ctx,
internalDeletionTaskGroup)
+ if getGroupErr != nil {
+ if !errors.Is(getGroupErr, schema.ErrGRPCResourceNotFound) {
+ return fmt.Errorf("failed to get internal deletion task
group: %w", getGroupErr)
+ }
+ if createErr :=
m.schemaRegistry.GroupRegistry().CreateGroup(ctx, group); createErr != nil {
+ return fmt.Errorf("failed to create internal deletion
task group: %w", createErr)
+ }
+ }
+ propSchema := &databasev1.Property{
+ Metadata: &commonv1.Metadata{
+ Group: internalDeletionTaskGroup,
+ Name: internalDeletionTaskPropertyName,
+ },
+ Tags: []*databasev1.TagSpec{
+ {
+ Name: taskDataTagName,
+ Type: databasev1.TagType_TAG_TYPE_DATA_BINARY,
+ },
+ },
+ }
+ _, getPropErr := m.schemaRegistry.PropertyRegistry().GetProperty(ctx,
propSchema.Metadata)
+ if getPropErr != nil {
+ if !errors.Is(getPropErr, schema.ErrGRPCResourceNotFound) {
+ return fmt.Errorf("failed to get internal deletion task
property schema: %w", getPropErr)
+ }
+ if createErr :=
m.schemaRegistry.PropertyRegistry().CreateProperty(ctx, propSchema); createErr
!= nil {
+ return fmt.Errorf("failed to create internal deletion
task property schema: %w", createErr)
+ }
+ }
+ return nil
+}
+
+func (m *groupDeletionTaskManager) startDeletion(ctx context.Context, group
string, dataOnly bool) error {
+ existingTask, getTaskErr := m.getDeletionTask(ctx, group)
+ if getTaskErr == nil {
+ switch existingTask.CurrentPhase {
+ case databasev1.GroupDeletionTask_PHASE_COMPLETED:
+ _, getGroupErr :=
m.schemaRegistry.GroupRegistry().GetGroup(ctx, group)
+ if getGroupErr != nil {
+ if errors.Is(getGroupErr,
schema.ErrGRPCResourceNotFound) {
+ return fmt.Errorf("group %s has already
been deleted", group)
+ }
+ return fmt.Errorf("failed to check group
existence: %w", getGroupErr)
+ }
+ m.tasks.Delete(group)
+ case databasev1.GroupDeletionTask_PHASE_FAILED:
+ if _, loaded := m.tasks.LoadOrStore(group,
existingTask); loaded {
+ return fmt.Errorf("deletion task for group %s
is already in progress", group)
+ }
+ existingTask.CurrentPhase =
databasev1.GroupDeletionTask_PHASE_PENDING
+ existingTask.Message = "retrying after previous failure"
+ go m.executeDeletion(context.WithoutCancel(ctx), group,
existingTask, dataOnly)
+ return nil
+ case databasev1.GroupDeletionTask_PHASE_PENDING,
+ databasev1.GroupDeletionTask_PHASE_IN_PROGRESS:
+ if existingTask.GetUpdatedAt() != nil &&
+
time.Since(existingTask.GetUpdatedAt().AsTime()) < taskStaleTimeout {
+ return fmt.Errorf("deletion task for group %s
is already in progress (last updated %s ago)",
+ group,
time.Since(existingTask.GetUpdatedAt().AsTime()).Truncate(time.Second))
+ }
+ if _, loaded := m.tasks.LoadOrStore(group,
existingTask); loaded {
+ return fmt.Errorf("deletion task for group %s
is already in progress", group)
+ }
+ existingTask.CurrentPhase =
databasev1.GroupDeletionTask_PHASE_PENDING
+ existingTask.Message = "retrying after stale task
detected"
+ go m.executeDeletion(context.WithoutCancel(ctx), group,
existingTask, dataOnly)
+ return nil
+ }
+ }
+
+ task := &databasev1.GroupDeletionTask{
+ CurrentPhase: databasev1.GroupDeletionTask_PHASE_PENDING,
+ TotalCounts: make(map[string]int32),
+ DeletedCounts: make(map[string]int32),
+ CreatedAt: timestamppb.Now(),
+ }
+ if _, loaded := m.tasks.LoadOrStore(group, task); loaded {
+ return fmt.Errorf("deletion task for group %s is already in
progress", group)
+ }
+
+ dataInfo, dataErr := m.schemaRegistry.CollectDataInfo(ctx, group)
+ if dataErr != nil {
+ m.tasks.Delete(group)
+ return fmt.Errorf("failed to collect data info for group %s:
%w", group, dataErr)
+ }
+ var totalDataSize int64
+ for _, di := range dataInfo {
+ totalDataSize += di.GetDataSizeBytes()
+ }
+ task.TotalDataSizeBytes = totalDataSize
+ if saveErr := m.saveDeletionTask(ctx, group, task); saveErr != nil {
+ m.tasks.Delete(group)
+ return fmt.Errorf("failed to save initial deletion task for
group %s: %w", group, saveErr)
+ }
+ go m.executeDeletion(context.WithoutCancel(ctx), group, task, dataOnly)
+ return nil
+}
+
+func (m *groupDeletionTaskManager) executeDeletion(ctx context.Context, group
string, task *databasev1.GroupDeletionTask, dataOnly bool) {
+ task.Message = "waiting for in-flight requests to complete"
+ m.saveProgress(ctx, group, task)
+ done := m.groupRepo.waitInflightRequests(group)
+ defer m.groupRepo.markDeleted(group)
+ <-done
+
+ task.CurrentPhase = databasev1.GroupDeletionTask_PHASE_IN_PROGRESS
+ groupMeta, getGroupErr :=
m.schemaRegistry.GroupRegistry().GetGroup(ctx, group)
+ notFound := errors.Is(getGroupErr, schema.ErrGRPCResourceNotFound)
+ if getGroupErr != nil && !notFound {
+ m.failTask(ctx, group, task, fmt.Sprintf("failed to get group
metadata: %v", getGroupErr))
+ return
+ }
+ if !notFound && groupMeta != nil {
+ task.Message = "deleting data files"
+ m.saveProgress(ctx, group, task)
+ if dropErr := m.schemaRegistry.DropGroup(ctx,
groupMeta.Catalog, group); dropErr != nil {
+ m.failTask(ctx, group, task, fmt.Sprintf("failed to
delete data files: %v", dropErr))
+ return
+ }
+ }
+
+ if dataOnly {
+ task.CurrentPhase = databasev1.GroupDeletionTask_PHASE_COMPLETED
+ task.Message = "data files deleted successfully"
+ m.saveProgress(ctx, group, task)
+ return
+ }
+
+ opt := schema.ListOpt{Group: group}
+ type deletionStep struct {
+ fn func() error
+ message string
+ }
+ steps := []deletionStep{
+ {func() error { return m.deleteIndexRuleBindings(ctx, opt,
task) }, "deleting index rule bindings"},
+ {func() error { return m.deleteIndexRules(ctx, opt, task) },
"deleting index rules"},
+ {func() error { return m.deleteProperties(ctx, opt, task) },
"deleting properties"},
+ {func() error { return m.deleteStreams(ctx, opt, task) },
"deleting streams"},
+ {func() error { return m.deleteMeasures(ctx, opt, task) },
"deleting measures"},
+ {func() error { return m.deleteTraces(ctx, opt, task) },
"deleting traces"},
+ {func() error { return m.deleteTopNAggregations(ctx, opt, task)
}, "deleting topN aggregations"},
+ {func() error { return m.deleteGroup(ctx, group) }, "deleting
group"},
+ }
+ for _, step := range steps {
+ if stepErr := m.runStep(ctx, group, task, step.message,
step.fn); stepErr != nil {
+ return
+ }
+ }
+
+ task.CurrentPhase = databasev1.GroupDeletionTask_PHASE_COMPLETED
+ task.Message = "group deleted successfully"
+ m.saveProgress(ctx, group, task)
+}
+
+func (m *groupDeletionTaskManager) deleteIndexRuleBindings(
+ ctx context.Context, opt schema.ListOpt, task
*databasev1.GroupDeletionTask,
+) error {
+ bindings, listErr :=
m.schemaRegistry.IndexRuleBindingRegistry().ListIndexRuleBinding(ctx, opt)
+ if listErr != nil {
+ return listErr
+ }
+ task.TotalCounts["index_rule_binding"] = int32(len(bindings))
+ for _, binding := range bindings {
+ if _, deleteErr :=
m.schemaRegistry.IndexRuleBindingRegistry().DeleteIndexRuleBinding(ctx,
binding.GetMetadata()); deleteErr != nil {
+ return fmt.Errorf("index rule binding %s: %w",
binding.GetMetadata().GetName(), deleteErr)
+ }
+ }
+ task.DeletedCounts["index_rule_binding"] =
task.TotalCounts["index_rule_binding"]
+ return nil
+}
+
+func (m *groupDeletionTaskManager) deleteIndexRules(
+ ctx context.Context, opt schema.ListOpt, task
*databasev1.GroupDeletionTask,
+) error {
+ indexRules, listErr :=
m.schemaRegistry.IndexRuleRegistry().ListIndexRule(ctx, opt)
+ if listErr != nil {
+ return listErr
+ }
+ task.TotalCounts["index_rule"] = int32(len(indexRules))
+ for _, rule := range indexRules {
+ if _, deleteErr :=
m.schemaRegistry.IndexRuleRegistry().DeleteIndexRule(ctx, rule.GetMetadata());
deleteErr != nil {
+ return fmt.Errorf("index rule %s: %w",
rule.GetMetadata().GetName(), deleteErr)
+ }
+ }
+ task.DeletedCounts["index_rule"] = task.TotalCounts["index_rule"]
+ return nil
+}
+
+func (m *groupDeletionTaskManager) deleteProperties(
+ ctx context.Context, opt schema.ListOpt, task
*databasev1.GroupDeletionTask,
+) error {
+ properties, listErr :=
m.schemaRegistry.PropertyRegistry().ListProperty(ctx, opt)
+ if listErr != nil {
+ return listErr
+ }
+ task.TotalCounts["property"] = int32(len(properties))
+ for _, prop := range properties {
+ if _, deleteErr :=
m.schemaRegistry.PropertyRegistry().DeleteProperty(ctx, prop.GetMetadata());
deleteErr != nil {
+ return fmt.Errorf("property %s: %w",
prop.GetMetadata().GetName(), deleteErr)
+ }
+ }
+ task.DeletedCounts["property"] = task.TotalCounts["property"]
+ return nil
+}
+
+func (m *groupDeletionTaskManager) deleteStreams(
+ ctx context.Context, opt schema.ListOpt, task
*databasev1.GroupDeletionTask,
+) error {
+ streams, listErr := m.schemaRegistry.StreamRegistry().ListStream(ctx,
opt)
+ if listErr != nil {
+ return listErr
+ }
+ task.TotalCounts["stream"] = int32(len(streams))
+ for _, stream := range streams {
+ if _, deleteErr :=
m.schemaRegistry.StreamRegistry().DeleteStream(ctx, stream.GetMetadata());
deleteErr != nil {
+ return fmt.Errorf("stream %s: %w",
stream.GetMetadata().GetName(), deleteErr)
+ }
+ }
+ task.DeletedCounts["stream"] = task.TotalCounts["stream"]
+ return nil
+}
+
+func (m *groupDeletionTaskManager) deleteMeasures(
+ ctx context.Context, opt schema.ListOpt, task
*databasev1.GroupDeletionTask,
+) error {
+ measures, listErr :=
m.schemaRegistry.MeasureRegistry().ListMeasure(ctx, opt)
+ if listErr != nil {
+ return listErr
+ }
+ task.TotalCounts["measure"] = int32(len(measures))
+ for _, measure := range measures {
+ if _, deleteErr :=
m.schemaRegistry.MeasureRegistry().DeleteMeasure(ctx, measure.GetMetadata());
deleteErr != nil {
+ return fmt.Errorf("measure %s: %w",
measure.GetMetadata().GetName(), deleteErr)
+ }
+ }
+ task.DeletedCounts["measure"] = task.TotalCounts["measure"]
+ return nil
+}
+
+func (m *groupDeletionTaskManager) deleteTraces(
+ ctx context.Context, opt schema.ListOpt, task
*databasev1.GroupDeletionTask,
+) error {
+ traces, listErr := m.schemaRegistry.TraceRegistry().ListTrace(ctx, opt)
+ if listErr != nil {
+ return listErr
+ }
+ task.TotalCounts["trace"] = int32(len(traces))
+ for _, trace := range traces {
+ if _, deleteErr :=
m.schemaRegistry.TraceRegistry().DeleteTrace(ctx, trace.GetMetadata());
deleteErr != nil {
+ return fmt.Errorf("trace %s: %w",
trace.GetMetadata().GetName(), deleteErr)
+ }
+ }
+ task.DeletedCounts["trace"] = task.TotalCounts["trace"]
+ return nil
+}
+
+func (m *groupDeletionTaskManager) deleteTopNAggregations(
+ ctx context.Context, opt schema.ListOpt, task
*databasev1.GroupDeletionTask,
+) error {
+ topNAggs, listErr :=
m.schemaRegistry.TopNAggregationRegistry().ListTopNAggregation(ctx, opt)
+ if listErr != nil {
+ return listErr
+ }
+ task.TotalCounts["topn_aggregation"] = int32(len(topNAggs))
+ for _, agg := range topNAggs {
+ if _, deleteErr :=
m.schemaRegistry.TopNAggregationRegistry().DeleteTopNAggregation(ctx,
agg.GetMetadata()); deleteErr != nil {
+ return fmt.Errorf("topN aggregation %s: %w",
agg.GetMetadata().GetName(), deleteErr)
+ }
+ }
+ task.DeletedCounts["topn_aggregation"] =
task.TotalCounts["topn_aggregation"]
+ return nil
+}
+
+func (m *groupDeletionTaskManager) deleteGroup(ctx context.Context, group
string) error {
+ _, deleteErr := m.schemaRegistry.GroupRegistry().DeleteGroup(ctx, group)
+ if deleteErr != nil && !errors.Is(deleteErr,
schema.ErrGRPCResourceNotFound) {
+ return fmt.Errorf("failed to delete group %s: %w", group,
deleteErr)
+ }
+ return nil
+}
+
+func (m *groupDeletionTaskManager) runStep(
+ ctx context.Context, group string, task *databasev1.GroupDeletionTask,
+ message string, fn func() error,
+) error {
+ task.Message = message
+ if stepErr := fn(); stepErr != nil {
+ m.failTask(ctx, group, task, fmt.Sprintf("%s failed: %v",
message, stepErr))
+ return stepErr
+ }
+ m.saveProgress(ctx, group, task)
+ return nil
+}
+
+func (m *groupDeletionTaskManager) saveProgress(ctx context.Context, group
string, task *databasev1.GroupDeletionTask) {
+ task.UpdatedAt = timestamppb.Now()
+ snapshot := proto.Clone(task).(*databasev1.GroupDeletionTask)
+ m.tasks.Store(group, snapshot)
+ if saveErr := m.saveDeletionTask(ctx, group, snapshot); saveErr != nil {
+ m.log.Error().Err(saveErr).Str("group", group).Msg("failed to
save deletion progress")
+ }
+}
+
+func (m *groupDeletionTaskManager) failTask(
+ ctx context.Context, group string, task *databasev1.GroupDeletionTask,
msg string,
+) {
+ task.CurrentPhase = databasev1.GroupDeletionTask_PHASE_FAILED
+ task.Message = msg
+ m.log.Error().Str("group", group).Msg(msg)
+ m.saveProgress(ctx, group, task)
+}
+
+func (m *groupDeletionTaskManager) saveDeletionTask(ctx context.Context, group
string, task *databasev1.GroupDeletionTask) error {
+ taskData, marshalErr := proto.Marshal(task)
+ if marshalErr != nil {
+ return fmt.Errorf("failed to marshal deletion task: %w",
marshalErr)
+ }
+ _, applyErr := m.propServer.Apply(ctx, &propertyv1.ApplyRequest{
+ Property: &propertyv1.Property{
+ Metadata: &commonv1.Metadata{
+ Group: internalDeletionTaskGroup,
+ Name: internalDeletionTaskPropertyName,
+ },
+ Id: group,
+ Tags: []*modelv1.Tag{
+ {
+ Key: taskDataTagName,
+ Value: &modelv1.TagValue{Value:
&modelv1.TagValue_BinaryData{BinaryData: taskData}},
+ },
+ },
+ },
+ Strategy: propertyv1.ApplyRequest_STRATEGY_REPLACE,
+ })
+ if applyErr != nil {
+ return fmt.Errorf("failed to save deletion task property: %w",
applyErr)
+ }
+ return nil
+}
+
+func (m *groupDeletionTaskManager) getDeletionTask(ctx context.Context, group
string) (*databasev1.GroupDeletionTask, error) {
+ if v, ok := m.tasks.Load(group); ok {
+ if task, isTask := v.(*databasev1.GroupDeletionTask); isTask {
+ return
proto.Clone(task).(*databasev1.GroupDeletionTask), nil
+ }
+ }
+ resp, queryErr := m.propServer.Query(ctx, &propertyv1.QueryRequest{
+ Groups: []string{internalDeletionTaskGroup},
+ Name: internalDeletionTaskPropertyName,
+ Ids: []string{group},
+ Limit: 1,
+ })
+ if queryErr != nil {
+ return nil, fmt.Errorf("failed to query deletion task property:
%w", queryErr)
+ }
+ if len(resp.Properties) == 0 {
+ return nil, fmt.Errorf("deletion task for group %s not found",
group)
+ }
+ for _, tag := range resp.Properties[0].Tags {
+ if tag.Key == taskDataTagName {
+ binaryData := tag.Value.GetBinaryData()
+ if binaryData == nil {
+ return nil, fmt.Errorf("deletion task for group
%s has no binary data", group)
+ }
+ var task databasev1.GroupDeletionTask
+ if unmarshalErr := proto.Unmarshal(binaryData, &task);
unmarshalErr != nil {
+ return nil, fmt.Errorf("failed to unmarshal
deletion task: %w", unmarshalErr)
+ }
+ return &task, nil
+ }
+ }
+ return nil, fmt.Errorf("deletion task for group %s has no task_data
tag", group)
+}
+
+func (m *groupDeletionTaskManager) hasNonEmptyResources(ctx context.Context,
group string) (bool, error) {
+ dataInfo, dataErr := m.schemaRegistry.CollectDataInfo(ctx, group)
+ if dataErr != nil {
+ return false, fmt.Errorf("failed to collect data info: %w",
dataErr)
+ }
+ for _, di := range dataInfo {
+ if di.GetDataSizeBytes() > 0 {
+ return true, nil
+ }
+ }
+ return false, nil
+}
diff --git a/banyand/liaison/grpc/deletion_test.go
b/banyand/liaison/grpc/deletion_test.go
new file mode 100644
index 000000000..88b810a7c
--- /dev/null
+++ b/banyand/liaison/grpc/deletion_test.go
@@ -0,0 +1,275 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+ "context"
+ "errors"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "go.uber.org/mock/gomock"
+ "google.golang.org/protobuf/proto"
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ propertyv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+ "github.com/apache/skywalking-banyandb/banyand/metadata"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+type mockPropertyApplier struct {
+ applyFn func(ctx context.Context, req *propertyv1.ApplyRequest)
(*propertyv1.ApplyResponse, error)
+ queryFn func(ctx context.Context, req *propertyv1.QueryRequest)
(*propertyv1.QueryResponse, error)
+}
+
+func (m *mockPropertyApplier) Apply(ctx context.Context, req
*propertyv1.ApplyRequest) (*propertyv1.ApplyResponse, error) {
+ return m.applyFn(ctx, req)
+}
+
+func (m *mockPropertyApplier) Query(ctx context.Context, req
*propertyv1.QueryRequest) (*propertyv1.QueryResponse, error) {
+ return m.queryFn(ctx, req)
+}
+
+// stubIndexRuleBinding implements schema.IndexRuleBinding returning empty
results.
+type stubIndexRuleBinding struct{}
+
+func (s *stubIndexRuleBinding) GetIndexRuleBinding(_ context.Context, _
*commonv1.Metadata) (*databasev1.IndexRuleBinding, error) {
+ return nil, nil
+}
+
+func (s *stubIndexRuleBinding) ListIndexRuleBinding(_ context.Context, _
schema.ListOpt) ([]*databasev1.IndexRuleBinding, error) {
+ return nil, nil
+}
+
+func (s *stubIndexRuleBinding) CreateIndexRuleBinding(_ context.Context, _
*databasev1.IndexRuleBinding) error {
+ return nil
+}
+
+func (s *stubIndexRuleBinding) UpdateIndexRuleBinding(_ context.Context, _
*databasev1.IndexRuleBinding) error {
+ return nil
+}
+
+func (s *stubIndexRuleBinding) DeleteIndexRuleBinding(_ context.Context, _
*commonv1.Metadata) (bool, error) {
+ return true, nil
+}
+
+// stubIndexRule implements schema.IndexRule returning empty results.
+type stubIndexRule struct{}
+
+func (s *stubIndexRule) GetIndexRule(_ context.Context, _ *commonv1.Metadata)
(*databasev1.IndexRule, error) {
+ return nil, nil
+}
+
+func (s *stubIndexRule) ListIndexRule(_ context.Context, _ schema.ListOpt)
([]*databasev1.IndexRule, error) {
+ return nil, nil
+}
+
+func (s *stubIndexRule) CreateIndexRule(_ context.Context, _
*databasev1.IndexRule) error {
+ return nil
+}
+
+func (s *stubIndexRule) UpdateIndexRule(_ context.Context, _
*databasev1.IndexRule) error {
+ return nil
+}
+
+func (s *stubIndexRule) DeleteIndexRule(_ context.Context, _
*commonv1.Metadata) (bool, error) {
+ return true, nil
+}
+
+func TestHasNonEmptyResources(t *testing.T) {
+ tests := []struct {
+ name string
+ infos []*databasev1.DataInfo
+ expected bool
+ }{
+ {
+ name: "all zero sizes",
+ infos: []*databasev1.DataInfo{{DataSizeBytes: 0},
{DataSizeBytes: 0}},
+ expected: false,
+ },
+ {
+ name: "one non-zero size",
+ infos: []*databasev1.DataInfo{{DataSizeBytes: 0},
{DataSizeBytes: 1024}},
+ expected: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ ctrl := gomock.NewController(t)
+ defer ctrl.Finish()
+
+ mockRepo := metadata.NewMockRepo(ctrl)
+ mockRepo.EXPECT().CollectDataInfo(gomock.Any(),
"test-group").Return(tt.infos, nil)
+
+ m := &groupDeletionTaskManager{schemaRegistry: mockRepo}
+ hasResources, checkErr :=
m.hasNonEmptyResources(context.Background(), "test-group")
+ require.NoError(t, checkErr)
+ assert.Equal(t, tt.expected, hasResources)
+ })
+ }
+}
+
+func TestDeletion(t *testing.T) {
+ t.Run("duplicate prevention", func(t *testing.T) {
+ m := &groupDeletionTaskManager{}
+ m.tasks.Store("existing-group", &databasev1.GroupDeletionTask{
+ CurrentPhase:
databasev1.GroupDeletionTask_PHASE_IN_PROGRESS,
+ UpdatedAt: timestamppb.Now(),
+ })
+
+ err := m.startDeletion(context.Background(), "existing-group",
false)
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "already in progress")
+ })
+
+ t.Run("deletion", func(t *testing.T) {
+ ctrl := gomock.NewController(t)
+ defer ctrl.Finish()
+
+ const group = "test-group"
+ gr := &groupRepo{
+ log: logger.GetLogger("test"),
+ resourceOpts: make(map[string]*commonv1.ResourceOpts),
+ inflight: make(map[string]*groupInflight),
+ }
+ require.NoError(t, gr.acquireRequest(group))
+
+ var (
+ mu sync.Mutex
+ lastTask = &databasev1.GroupDeletionTask{}
+ )
+ propApplier := &mockPropertyApplier{
+ applyFn: func(_ context.Context, req
*propertyv1.ApplyRequest) (*propertyv1.ApplyResponse, error) {
+ for _, tag := range req.Property.Tags {
+ if tag.Key == taskDataTagName {
+ var task
databasev1.GroupDeletionTask
+ if unmarshalErr :=
proto.Unmarshal(tag.Value.GetBinaryData(), &task); unmarshalErr == nil {
+ mu.Lock()
+ lastTask =
proto.Clone(&task).(*databasev1.GroupDeletionTask)
+ mu.Unlock()
+ }
+ }
+ }
+ return &propertyv1.ApplyResponse{}, nil
+ },
+ queryFn: func(_ context.Context, _
*propertyv1.QueryRequest) (*propertyv1.QueryResponse, error) {
+ mu.Lock()
+ cloned :=
proto.Clone(lastTask).(*databasev1.GroupDeletionTask)
+ mu.Unlock()
+ taskData, _ := proto.Marshal(cloned)
+ return &propertyv1.QueryResponse{
+ Properties: []*propertyv1.Property{{
+ Id: group,
+ Tags: []*modelv1.Tag{{
+ Key: taskDataTagName,
+ Value:
&modelv1.TagValue{Value: &modelv1.TagValue_BinaryData{BinaryData: taskData}},
+ }},
+ }},
+ }, nil
+ },
+ }
+
+ mockRepo := metadata.NewMockRepo(ctrl)
+ mockRepo.EXPECT().CollectDataInfo(gomock.Any(),
group).Return([]*databasev1.DataInfo{{DataSizeBytes: 512}}, nil)
+
mockRepo.EXPECT().IndexRuleBindingRegistry().Return(&stubIndexRuleBinding{})
+ mockRepo.EXPECT().IndexRuleRegistry().Return(&stubIndexRule{})
+
+ mockProperty := schema.NewMockProperty(ctrl)
+ mockProperty.EXPECT().ListProperty(gomock.Any(),
schema.ListOpt{Group: group}).Return(nil, nil)
+ mockRepo.EXPECT().PropertyRegistry().Return(mockProperty)
+
+ mockStream := schema.NewMockStream(ctrl)
+ mockStream.EXPECT().ListStream(gomock.Any(),
schema.ListOpt{Group: group}).Return(nil, nil)
+ mockRepo.EXPECT().StreamRegistry().Return(mockStream)
+
+ mockMeasure := schema.NewMockMeasure(ctrl)
+ mockMeasure.EXPECT().ListMeasure(gomock.Any(),
schema.ListOpt{Group: group}).Return(nil, nil)
+ mockRepo.EXPECT().MeasureRegistry().Return(mockMeasure)
+
+ mockTrace := schema.NewMockTrace(ctrl)
+ mockTrace.EXPECT().ListTrace(gomock.Any(),
schema.ListOpt{Group: group}).Return(nil, nil)
+ mockRepo.EXPECT().TraceRegistry().Return(mockTrace)
+
+ mockTopN := schema.NewMockTopNAggregation(ctrl)
+ mockTopN.EXPECT().ListTopNAggregation(gomock.Any(),
schema.ListOpt{Group: group}).Return(nil, nil)
+ mockRepo.EXPECT().TopNAggregationRegistry().Return(mockTopN)
+
+ mockGroup := schema.NewMockGroup(ctrl)
+ mockGroup.EXPECT().GetGroup(gomock.Any(),
group).Return(&commonv1.Group{
+ Metadata: &commonv1.Metadata{Name: group},
+ Catalog: commonv1.Catalog_CATALOG_STREAM,
+ }, nil)
+ mockRepo.EXPECT().DropGroup(gomock.Any(),
commonv1.Catalog_CATALOG_STREAM, group).Return(nil)
+ mockGroup.EXPECT().DeleteGroup(gomock.Any(), group).DoAndReturn(
+ func(_ context.Context, g string) (bool, error) {
+ go func() {
+ time.Sleep(10 * time.Millisecond)
+ gr.OnDelete(schema.Metadata{
+ TypeMeta: schema.TypeMeta{Kind:
schema.KindGroup},
+ Spec: &commonv1.Group{
+ Metadata:
&commonv1.Metadata{Name: g},
+ ResourceOpts:
&commonv1.ResourceOpts{ShardNum: 1},
+ Catalog:
commonv1.Catalog_CATALOG_STREAM,
+ },
+ })
+ }()
+ return true, nil
+ },
+ )
+ mockRepo.EXPECT().GroupRegistry().Return(mockGroup).Times(2)
+
+ m := &groupDeletionTaskManager{
+ schemaRegistry: mockRepo,
+ propServer: propApplier,
+ groupRepo: gr,
+ log: logger.GetLogger("test"),
+ }
+ require.NoError(t, m.startDeletion(context.Background(), group,
false))
+ require.Eventually(t, func() bool {
+ acquireErr := gr.acquireRequest(group)
+ if acquireErr == nil {
+ gr.releaseRequest(group)
+ return false
+ }
+ return errors.Is(acquireErr, errGroupPendingDeletion)
+ }, 2*time.Second, 10*time.Millisecond)
+
+ pendingTask, queryErr :=
m.getDeletionTask(context.Background(), group)
+ require.NoError(t, queryErr)
+ assert.Equal(t, databasev1.GroupDeletionTask_PHASE_PENDING,
pendingTask.GetCurrentPhase())
+ assert.Equal(t, int64(512), pendingTask.GetTotalDataSizeBytes())
+
+ gr.releaseRequest(group)
+ require.Eventually(t, func() bool {
+ statusTask, statusErr :=
m.getDeletionTask(context.Background(), group)
+ return statusErr == nil && statusTask.GetCurrentPhase()
== databasev1.GroupDeletionTask_PHASE_COMPLETED
+ }, 5*time.Second, 20*time.Millisecond)
+
+ finalTask, finalErr := m.getDeletionTask(context.Background(),
group)
+ require.NoError(t, finalErr)
+ assert.Equal(t, databasev1.GroupDeletionTask_PHASE_COMPLETED,
finalTask.GetCurrentPhase())
+ assert.Equal(t, "group deleted successfully",
finalTask.GetMessage())
+ })
+}
diff --git a/banyand/liaison/grpc/discovery.go
b/banyand/liaison/grpc/discovery.go
index db6953dd5..536e9a484 100644
--- a/banyand/liaison/grpc/discovery.go
+++ b/banyand/liaison/grpc/discovery.go
@@ -34,7 +34,10 @@ import (
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
)
-var errNotExist = errors.New("the object doesn't exist")
+var (
+ errNotExist = errors.New("the object doesn't exist")
+ errGroupPendingDeletion = errors.New("group is pending deletion")
+)
type discoveryService struct {
metadataRepo metadata.Repo
@@ -138,13 +141,71 @@ func (i identity) String() string {
var _ schema.EventHandler = (*groupRepo)(nil)
+type groupInflight struct {
+ done chan struct{}
+ wg sync.WaitGroup
+}
+
type groupRepo struct {
schema.UnimplementedOnInitHandler
log *logger.Logger
resourceOpts map[string]*commonv1.ResourceOpts
+ inflight map[string]*groupInflight
sync.RWMutex
}
+func (s *groupRepo) acquireRequest(groupName string) error {
+ s.RWMutex.Lock()
+ defer s.RWMutex.Unlock()
+ item, ok := s.inflight[groupName]
+ if ok && item.done != nil {
+ return fmt.Errorf("%s: %w", groupName, errGroupPendingDeletion)
+ }
+ if !ok {
+ item = &groupInflight{}
+ s.inflight[groupName] = item
+ }
+ item.wg.Add(1)
+ return nil
+}
+
+func (s *groupRepo) releaseRequest(groupName string) {
+ s.RWMutex.RLock()
+ item, ok := s.inflight[groupName]
+ s.RWMutex.RUnlock()
+ if ok {
+ item.wg.Done()
+ }
+}
+
+func (s *groupRepo) waitInflightRequests(groupName string) <-chan struct{} {
+ s.RWMutex.Lock()
+ item, ok := s.inflight[groupName]
+ if !ok {
+ item = &groupInflight{}
+ s.inflight[groupName] = item
+ }
+ if item.done != nil {
+ ch := item.done
+ s.RWMutex.Unlock()
+ return ch
+ }
+ ch := make(chan struct{})
+ item.done = ch
+ s.RWMutex.Unlock()
+ go func(wg *sync.WaitGroup, done chan struct{}) {
+ wg.Wait()
+ close(done)
+ }(&item.wg, ch)
+ return ch
+}
+
+func (s *groupRepo) markDeleted(groupName string) {
+ s.RWMutex.Lock()
+ defer s.RWMutex.Unlock()
+ delete(s.inflight, groupName)
+}
+
func (s *groupRepo) OnAddOrUpdate(schemaMetadata schema.Metadata) {
if schemaMetadata.Kind != schema.KindGroup {
return
diff --git a/banyand/liaison/grpc/discovery_test.go
b/banyand/liaison/grpc/discovery_test.go
new file mode 100644
index 000000000..291b6786d
--- /dev/null
+++ b/banyand/liaison/grpc/discovery_test.go
@@ -0,0 +1,114 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+func TestGroupRepo_AcquireAndRelease(t *testing.T) {
+ gr := &groupRepo{
+ log: logger.GetLogger("test"),
+ resourceOpts: make(map[string]*commonv1.ResourceOpts),
+ inflight: make(map[string]*groupInflight),
+ }
+
+ require.NoError(t, gr.acquireRequest("test-group"))
+ require.NoError(t, gr.acquireRequest("test-group"))
+ gr.releaseRequest("test-group")
+ gr.releaseRequest("test-group")
+
+ gr.RWMutex.RLock()
+ item, ok := gr.inflight["test-group"]
+ gr.RWMutex.RUnlock()
+ assert.True(t, ok)
+ assert.Nil(t, item.done)
+}
+
+func TestGroupRepo_AcquireDuringPendingDeletion(t *testing.T) {
+ gr := &groupRepo{
+ log: logger.GetLogger("test"),
+ resourceOpts: make(map[string]*commonv1.ResourceOpts),
+ inflight: make(map[string]*groupInflight),
+ }
+
+ _ = gr.waitInflightRequests("del-group")
+ err := gr.acquireRequest("del-group")
+ assert.ErrorIs(t, err, errGroupPendingDeletion)
+}
+
+func TestGroupRepo_WaitInflightRequests(t *testing.T) {
+ tests := []struct {
+ name string
+ acquireNum int
+ }{
+ {name: "completes after release", acquireNum: 1},
+ {name: "no inflight", acquireNum: 0},
+ }
+ for _, tc := range tests {
+ t.Run(tc.name, func(t *testing.T) {
+ gr := &groupRepo{
+ log: logger.GetLogger("test"),
+ resourceOpts:
make(map[string]*commonv1.ResourceOpts),
+ inflight: make(map[string]*groupInflight),
+ }
+ for range tc.acquireNum {
+ require.NoError(t, gr.acquireRequest("g"))
+ }
+ done := gr.waitInflightRequests("g")
+ if tc.acquireNum > 0 {
+ select {
+ case <-done:
+ t.Fatal("done channel should not be
closed while requests are in flight")
+ default:
+ }
+ for range tc.acquireNum {
+ gr.releaseRequest("g")
+ }
+ }
+ select {
+ case <-done:
+ case <-time.After(2 * time.Second):
+ t.Fatal("done channel did not close after all
requests were released")
+ }
+ })
+ }
+}
+
+func TestGroupRepo_MarkDeleted(t *testing.T) {
+ gr := &groupRepo{
+ log: logger.GetLogger("test"),
+ resourceOpts: make(map[string]*commonv1.ResourceOpts),
+ inflight: make(map[string]*groupInflight),
+ }
+
+ gr.waitInflightRequests("g3")
+ gr.markDeleted("g3")
+
+ gr.RWMutex.RLock()
+ _, ok := gr.inflight["g3"]
+ gr.RWMutex.RUnlock()
+ assert.False(t, ok)
+}
diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index 125c8aeaa..8076a9c61 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -138,14 +138,22 @@ func (ms *measureService) Write(measure
measurev1.MeasureService_WriteServer) er
ms.metrics.totalStreamMsgReceived.Inc(1, metadata.Group,
"measure", "write")
+ if acquireErr := ms.groupRepo.acquireRequest(metadata.Group);
acquireErr != nil {
+ ms.sendReply(metadata,
modelv1.Status_STATUS_INTERNAL_ERROR, writeRequest.GetMessageId(), measure)
+ continue
+ }
+
if status := ms.validateWriteRequest(writeRequest, metadata,
measure); status != modelv1.Status_STATUS_SUCCEED {
+ ms.groupRepo.releaseRequest(metadata.Group)
continue
}
if err := ms.processAndPublishRequest(ctx, writeRequest,
metadata, spec,
specEntityLocator, specShardingKeyLocator, publisher,
&succeedSent, measure, nodeMetadataSent, nodeSpecSent); err != nil {
+ ms.groupRepo.releaseRequest(metadata.Group)
continue
}
+ ms.groupRepo.releaseRequest(metadata.Group)
}
}
@@ -358,6 +366,16 @@ func (ms *measureService) handleWriteCleanup(publisher
queue.BatchPublisher, suc
var emptyMeasureQueryResponse = &measurev1.QueryResponse{DataPoints:
make([]*measurev1.DataPoint, 0)}
func (ms *measureService) Query(ctx context.Context, req
*measurev1.QueryRequest) (resp *measurev1.QueryResponse, err error) {
+ for _, g := range req.Groups {
+ if acquireErr := ms.groupRepo.acquireRequest(g); acquireErr !=
nil {
+ return nil, status.Errorf(codes.FailedPrecondition,
"group %s is pending deletion", g)
+ }
+ }
+ defer func() {
+ for _, g := range req.Groups {
+ ms.groupRepo.releaseRequest(g)
+ }
+ }()
for _, g := range req.Groups {
ms.metrics.totalStarted.Inc(1, g, "measure", "query")
}
@@ -424,6 +442,16 @@ func (ms *measureService) Query(ctx context.Context, req
*measurev1.QueryRequest
}
func (ms *measureService) TopN(ctx context.Context, topNRequest
*measurev1.TopNRequest) (resp *measurev1.TopNResponse, err error) {
+ for _, g := range topNRequest.GetGroups() {
+ if acquireErr := ms.groupRepo.acquireRequest(g); acquireErr !=
nil {
+ return nil, status.Errorf(codes.FailedPrecondition,
"group %s is pending deletion", g)
+ }
+ }
+ defer func() {
+ for _, g := range topNRequest.GetGroups() {
+ ms.groupRepo.releaseRequest(g)
+ }
+ }()
start := time.Now()
defer func() {
duration := time.Since(start)
diff --git a/banyand/liaison/grpc/property.go b/banyand/liaison/grpc/property.go
index c62957991..0393d759f 100644
--- a/banyand/liaison/grpc/property.go
+++ b/banyand/liaison/grpc/property.go
@@ -28,13 +28,14 @@ import (
"github.com/pkg/errors"
"go.uber.org/multierr"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/api/data"
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
- databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
propertyv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
"github.com/apache/skywalking-banyandb/banyand/metadata"
@@ -148,8 +149,8 @@ func (ps *propertyServer) validatePropertyTags(ctx
context.Context, property *pr
found := false
for _, ts := range propSchema.Tags {
if ts.Name == tag.Key {
- typ :=
databasev1.TagType(pbv1.MustTagValueToValueType(tag.Value))
- if typ !=
databasev1.TagType_TAG_TYPE_UNSPECIFIED && ts.Type != typ {
+ typ := pbv1.MustTagValueToValueType(tag.Value)
+ if typ != pbv1.ValueTypeUnknown &&
pbv1.MustTagValueSpecToValueType(ts.Type) != typ {
return errors.Errorf("property %s tag
%s type mismatch", property.Metadata.Name, tag.Key)
}
found = true
@@ -191,6 +192,10 @@ func (ps *propertyServer) Apply(ctx context.Context, req
*propertyv1.ApplyReques
return nil, err
}
g := req.Property.Metadata.Group
+ if acquireErr := ps.groupRepo.acquireRequest(g); acquireErr != nil {
+ return nil, status.Errorf(codes.FailedPrecondition, "group %s
is pending deletion", g)
+ }
+ defer ps.groupRepo.releaseRequest(g)
ps.metrics.totalStarted.Inc(1, g, "property", "apply")
start := time.Now()
defer func() {
@@ -370,6 +375,10 @@ func (ps *propertyServer) Delete(ctx context.Context, req
*propertyv1.DeleteRequ
return nil, schema.BadRequest("name", "name should not be nil")
}
g := req.Group
+ if acquireErr := ps.groupRepo.acquireRequest(g); acquireErr != nil {
+ return nil, status.Errorf(codes.FailedPrecondition, "group %s
is pending deletion", g)
+ }
+ defer ps.groupRepo.releaseRequest(g)
ps.metrics.totalStarted.Inc(1, g, "property", "delete")
start := time.Now()
defer func() {
@@ -415,6 +424,16 @@ func (ps *propertyServer) Delete(ctx context.Context, req
*propertyv1.DeleteRequ
}
func (ps *propertyServer) Query(ctx context.Context, req
*propertyv1.QueryRequest) (resp *propertyv1.QueryResponse, err error) {
+ for _, g := range req.Groups {
+ if acquireErr := ps.groupRepo.acquireRequest(g); acquireErr !=
nil {
+ return nil, status.Errorf(codes.FailedPrecondition,
"group %s is pending deletion", g)
+ }
+ }
+ defer func() {
+ for _, g := range req.Groups {
+ ps.groupRepo.releaseRequest(g)
+ }
+ }()
ps.metrics.totalStarted.Inc(1, "", "property", "query")
start := time.Now()
defer func() {
diff --git a/banyand/liaison/grpc/registry.go b/banyand/liaison/grpc/registry.go
index 8c0a2dc48..1e9af0c15 100644
--- a/banyand/liaison/grpc/registry.go
+++ b/banyand/liaison/grpc/registry.go
@@ -577,8 +577,9 @@ func (rs *measureRegistryServer) Exist(ctx context.Context,
type groupRegistryServer struct {
databasev1.UnimplementedGroupRegistryServiceServer
- schemaRegistry metadata.Repo
- metrics *metrics
+ schemaRegistry metadata.Repo
+ deletionTaskManager *groupDeletionTaskManager
+ metrics *metrics
}
func (rs *groupRegistryServer) Create(ctx context.Context, req
*databasev1.GroupRegistryServiceCreateRequest) (
@@ -618,20 +619,53 @@ func (rs *groupRegistryServer) Update(ctx
context.Context, req *databasev1.Group
func (rs *groupRegistryServer) Delete(ctx context.Context, req
*databasev1.GroupRegistryServiceDeleteRequest) (
*databasev1.GroupRegistryServiceDeleteResponse, error,
) {
- g := ""
+ g := req.GetGroup()
rs.metrics.totalRegistryStarted.Inc(1, g, "group", "delete")
start := time.Now()
defer func() {
rs.metrics.totalRegistryFinished.Inc(1, g, "group", "delete")
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "group",
"delete")
}()
- deleted, err := rs.schemaRegistry.GroupRegistry().DeleteGroup(ctx,
req.GetGroup())
- if err != nil {
+ if g == internalDeletionTaskGroup {
rs.metrics.totalRegistryErr.Inc(1, g, "group", "delete")
- return nil, err
+ return nil, status.Errorf(codes.PermissionDenied, "cannot
delete internal system group %s", g)
+ }
+ if _, getErr := rs.schemaRegistry.GroupRegistry().GetGroup(ctx, g);
getErr != nil {
+ rs.metrics.totalRegistryErr.Inc(1, g, "group", "delete")
+ return nil, getErr
+ }
+ if req.GetDryRun() {
+ return rs.dryRunDelete(ctx, g)
+ }
+ if !req.GetForce() {
+ hasResources, checkErr :=
rs.deletionTaskManager.hasNonEmptyResources(ctx, g)
+ if checkErr != nil {
+ rs.metrics.totalRegistryErr.Inc(1, g, "group", "delete")
+ return nil, checkErr
+ }
+ if hasResources {
+ rs.metrics.totalRegistryErr.Inc(1, g, "group", "delete")
+ return nil, status.Errorf(codes.FailedPrecondition,
+ "group %s is not empty, use force=true to
delete non-empty groups", g)
+ }
+ }
+ if startErr := rs.deletionTaskManager.startDeletion(ctx, g,
req.GetDataOnly()); startErr != nil {
+ rs.metrics.totalRegistryErr.Inc(1, g, "group", "delete")
+ return nil, startErr
+ }
+ return &databasev1.GroupRegistryServiceDeleteResponse{}, nil
+}
+
+func (rs *groupRegistryServer) dryRunDelete(ctx context.Context, g string) (
+ *databasev1.GroupRegistryServiceDeleteResponse, error,
+) {
+ schemaInfo, schemaErr := rs.collectSchemaInfo(ctx, g)
+ if schemaErr != nil {
+ rs.metrics.totalRegistryErr.Inc(1, g, "group", "delete")
+ return nil, schemaErr
}
return &databasev1.GroupRegistryServiceDeleteResponse{
- Deleted: deleted,
+ SchemaInfo: schemaInfo,
}, nil
}
@@ -738,10 +772,24 @@ func (rs *groupRegistryServer) Inspect(ctx
context.Context, req *databasev1.Grou
}, nil
}
-func (rs *groupRegistryServer) Query(_ context.Context, _
*databasev1.GroupRegistryServiceQueryRequest) (
+func (rs *groupRegistryServer) Query(ctx context.Context, req
*databasev1.GroupRegistryServiceQueryRequest) (
*databasev1.GroupRegistryServiceQueryResponse, error,
) {
- return nil, status.Error(codes.Unimplemented, "Query method not
implemented yet")
+ g := req.GetGroup()
+ rs.metrics.totalRegistryStarted.Inc(1, g, "group", "query")
+ start := time.Now()
+ defer func() {
+ rs.metrics.totalRegistryFinished.Inc(1, g, "group", "query")
+
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "group",
"query")
+ }()
+ task, queryErr := rs.deletionTaskManager.getDeletionTask(ctx, g)
+ if queryErr != nil {
+ rs.metrics.totalRegistryErr.Inc(1, g, "group", "query")
+ return nil, queryErr
+ }
+ return &databasev1.GroupRegistryServiceQueryResponse{
+ Task: task,
+ }, nil
}
func (rs *groupRegistryServer) collectSchemaInfo(ctx context.Context, group
string) (*databasev1.SchemaInfo, error) {
diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index 5fb4b2c1b..a78ebd004 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -141,7 +141,10 @@ func NewServer(_ context.Context, tir1Client, tir2Client,
broadcaster queue.Clie
schemaRegistry metadata.Repo, nr NodeRegistries, omr
observability.MetricsRegistry,
protectorService protector.Memory, routeProviders
map[string]route.TableProvider,
) Server {
- gr := &groupRepo{resourceOpts: make(map[string]*commonv1.ResourceOpts)}
+ gr := &groupRepo{
+ resourceOpts: make(map[string]*commonv1.ResourceOpts),
+ inflight: make(map[string]*groupInflight),
+ }
er := &entityRepo{entitiesMap: make(map[identity]partition.Locator),
measureMap: make(map[identity]*databasev1.Measure)}
streamSVC := &streamService{
discoveryService: newDiscoveryService(schema.KindStream,
schemaRegistry, nr.StreamLiaisonNodeRegistry, gr),
@@ -247,6 +250,12 @@ func (s *server) PreRun(ctx context.Context) error {
s.traceSVC.setLogger(s.log.Named("trace"))
s.propertyServer.SetLogger(s.log)
s.bydbQLSVC.setLogger(s.log.Named("bydbql"))
+ s.groupRegistryServer.deletionTaskManager = newGroupDeletionTaskManager(
+ s.groupRegistryServer.schemaRegistry, s.propertyServer,
s.groupRepo, s.log.Named("group-deletion"),
+ )
+ if initErr :=
s.groupRegistryServer.deletionTaskManager.initPropertyStorage(ctx); initErr !=
nil {
+ return initErr
+ }
components := []*discoveryService{
s.streamSVC.discoveryService,
s.measureSVC.discoveryService,
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index af570cfaa..ba26b3ef4 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -285,7 +285,13 @@ func (s *streamService) Write(stream
streamv1.StreamService_WriteServer) error {
requestCount++
s.metrics.totalStreamMsgReceived.Inc(1, metadata.Group,
"stream", "write")
+ if acquireErr := s.groupRepo.acquireRequest(metadata.Group);
acquireErr != nil {
+ s.sendReply(metadata,
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetMessageId(), stream)
+ continue
+ }
+
if s.validateWriteRequest(writeEntity, metadata, stream) !=
modelv1.Status_STATUS_SUCCEED {
+ s.groupRepo.releaseRequest(metadata.Group)
continue
}
@@ -293,6 +299,7 @@ func (s *streamService) Write(stream
streamv1.StreamService_WriteServer) error {
if err != nil {
s.l.Error().Err(err).RawJSON("written",
logger.Proto(writeEntity)).Msg("navigation failed")
s.sendReply(metadata,
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetMessageId(), stream)
+ s.groupRepo.releaseRequest(metadata.Group)
continue
}
@@ -306,8 +313,10 @@ func (s *streamService) Write(stream
streamv1.StreamService_WriteServer) error {
if err != nil {
s.l.Error().Err(err).RawJSON("written",
logger.Proto(writeEntity)).Msg("publishing failed")
s.sendReply(metadata,
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetMessageId(), stream)
+ s.groupRepo.releaseRequest(metadata.Group)
continue
}
+ s.groupRepo.releaseRequest(metadata.Group)
succeedSent = append(succeedSent, succeedSentMessage{
metadata: metadata,
@@ -320,6 +329,16 @@ func (s *streamService) Write(stream
streamv1.StreamService_WriteServer) error {
var emptyStreamQueryResponse = &streamv1.QueryResponse{Elements:
make([]*streamv1.Element, 0)}
func (s *streamService) Query(ctx context.Context, req *streamv1.QueryRequest)
(resp *streamv1.QueryResponse, err error) {
+ for _, g := range req.Groups {
+ if acquireErr := s.groupRepo.acquireRequest(g); acquireErr !=
nil {
+ return nil, status.Errorf(codes.FailedPrecondition,
"group %s is pending deletion", g)
+ }
+ }
+ defer func() {
+ for _, g := range req.Groups {
+ s.groupRepo.releaseRequest(g)
+ }
+ }()
for _, g := range req.Groups {
s.metrics.totalStarted.Inc(1, g, "stream", "query")
}
diff --git a/banyand/liaison/grpc/trace.go b/banyand/liaison/grpc/trace.go
index 083927c8e..3143d6418 100644
--- a/banyand/liaison/grpc/trace.go
+++ b/banyand/liaison/grpc/trace.go
@@ -374,7 +374,13 @@ func (s *traceService) Write(stream
tracev1.TraceService_WriteServer) error {
requestCount++
s.metrics.totalStreamMsgReceived.Inc(1, metadata.Group,
"trace", "write")
+ if acquireErr := s.groupRepo.acquireRequest(metadata.Group);
acquireErr != nil {
+ s.sendReply(metadata,
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetVersion(), stream)
+ continue
+ }
+
if s.validateWriteRequest(writeEntity, metadata, specLocator,
stream) != modelv1.Status_STATUS_SUCCEED {
+ s.groupRepo.releaseRequest(metadata.Group)
continue
}
@@ -382,6 +388,7 @@ func (s *traceService) Write(stream
tracev1.TraceService_WriteServer) error {
if err != nil {
s.l.Error().Err(err).RawJSON("written",
logger.Proto(writeEntity)).Msg("navigation failed")
s.sendReply(metadata,
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetVersion(), stream)
+ s.groupRepo.releaseRequest(metadata.Group)
continue
}
@@ -395,8 +402,10 @@ func (s *traceService) Write(stream
tracev1.TraceService_WriteServer) error {
if err != nil {
s.l.Error().Err(err).RawJSON("written",
logger.Proto(writeEntity)).Msg("publishing failed")
s.sendReply(metadata,
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetVersion(), stream)
+ s.groupRepo.releaseRequest(metadata.Group)
continue
}
+ s.groupRepo.releaseRequest(metadata.Group)
succeedSent = append(succeedSent, succeedSentMessage{
metadata: metadata,
@@ -409,6 +418,16 @@ func (s *traceService) Write(stream
tracev1.TraceService_WriteServer) error {
var emptyTraceQueryResponse = &tracev1.QueryResponse{Traces:
make([]*tracev1.Trace, 0)}
func (s *traceService) Query(ctx context.Context, req *tracev1.QueryRequest)
(resp *tracev1.QueryResponse, err error) {
+ for _, g := range req.Groups {
+ if acquireErr := s.groupRepo.acquireRequest(g); acquireErr !=
nil {
+ return nil, status.Errorf(codes.FailedPrecondition,
"group %s is pending deletion", g)
+ }
+ }
+ defer func() {
+ for _, g := range req.Groups {
+ s.groupRepo.releaseRequest(g)
+ }
+ }()
for _, g := range req.Groups {
s.metrics.totalStarted.Inc(1, g, "trace", "query")
}
diff --git a/banyand/measure/svc_data.go b/banyand/measure/svc_data.go
index bf7c9fedf..21ff035c5 100644
--- a/banyand/measure/svc_data.go
+++ b/banyand/measure/svc_data.go
@@ -91,6 +91,10 @@ func (s *dataSVC) LoadGroup(name string)
(resourceSchema.Group, bool) {
return s.schemaRepo.LoadGroup(name)
}
+func (s *dataSVC) DropGroup(_ context.Context, groupName string) error {
+ return s.schemaRepo.DropGroup(groupName)
+}
+
func (s *dataSVC) GetRemovalSegmentsTimeRange(group string)
*timestamp.TimeRange {
return s.schemaRepo.GetRemovalSegmentsTimeRange(group)
}
@@ -275,6 +279,9 @@ func (s *dataSVC) PreRun(ctx context.Context) error {
if subscribeErr :=
s.pipeline.Subscribe(data.TopicMeasureCollectDataInfo,
collectDataInfoListener); subscribeErr != nil {
return fmt.Errorf("failed to subscribe to collect data info
topic: %w", subscribeErr)
}
+ if dropGroupErr := s.pipeline.Subscribe(data.TopicMeasureDropGroup,
&dropGroupDataListener{s: s}); dropGroupErr != nil {
+ return fmt.Errorf("failed to subscribe to drop group topic:
%w", dropGroupErr)
+ }
if err = s.createDataNativeObservabilityGroup(ctx); err != nil {
return err
@@ -588,3 +595,19 @@ func (l *collectDataInfoListener) Rev(ctx context.Context,
message bus.Message)
}
return bus.NewMessage(message.ID(), dataInfo)
}
+
+type dropGroupDataListener struct {
+ *bus.UnImplementedHealthyListener
+ s *dataSVC
+}
+
+func (l *dropGroupDataListener) Rev(ctx context.Context, message bus.Message)
bus.Message {
+ req, ok :=
message.Data().(*databasev1.GroupRegistryServiceDeleteRequest)
+ if !ok {
+ return bus.NewMessage(message.ID(), common.NewError("invalid
data type for drop group request"))
+ }
+ if dropErr := l.s.DropGroup(ctx, req.Group); dropErr != nil {
+ return bus.NewMessage(message.ID(), common.NewError("failed to
drop group data: %v", dropErr))
+ }
+ return bus.NewMessage(message.ID(), req)
+}
diff --git a/banyand/measure/svc_liaison.go b/banyand/measure/svc_liaison.go
index 6e946aba5..7212c3316 100644
--- a/banyand/measure/svc_liaison.go
+++ b/banyand/measure/svc_liaison.go
@@ -75,6 +75,10 @@ func (s *liaison) LoadGroup(name string)
(resourceSchema.Group, bool) {
return s.schemaRepo.LoadGroup(name)
}
+func (s *liaison) DropGroup(_ context.Context, groupName string) error {
+ return s.schemaRepo.DropGroup(groupName)
+}
+
func (s *liaison) GetRemovalSegmentsTimeRange(group string)
*timestamp.TimeRange {
return s.schemaRepo.GetRemovalSegmentsTimeRange(group)
}
@@ -188,12 +192,16 @@ func (s *liaison) PreRun(ctx context.Context) error {
}
if metaSvc, ok := s.metadata.(metadata.Service); ok {
metaSvc.RegisterLiaisonCollector(commonv1.Catalog_CATALOG_MEASURE, s)
+
metaSvc.RegisterGroupDropHandler(commonv1.Catalog_CATALOG_MEASURE, s)
}
collectLiaisonInfoListener := &collectLiaisonInfoListener{s: s}
if subscribeErr :=
s.pipeline.Subscribe(data.TopicMeasureCollectLiaisonInfo,
collectLiaisonInfoListener); subscribeErr != nil {
return fmt.Errorf("failed to subscribe to collect liaison info
topic: %w", subscribeErr)
}
+ if subscribeErr := s.pipeline.Subscribe(data.TopicMeasureDropGroup,
&dropGroupLiaisonListener{s: s}); subscribeErr != nil {
+ return fmt.Errorf("failed to subscribe to drop group topic:
%w", subscribeErr)
+ }
return topNResultPipeline.Subscribe(data.TopicMeasureWrite,
writeListener)
}
@@ -238,3 +246,19 @@ func (l *collectLiaisonInfoListener) Rev(ctx
context.Context, message bus.Messag
}
return bus.NewMessage(message.ID(), liaisonInfo)
}
+
+type dropGroupLiaisonListener struct {
+ *bus.UnImplementedHealthyListener
+ s *liaison
+}
+
+func (l *dropGroupLiaisonListener) Rev(ctx context.Context, message
bus.Message) bus.Message {
+ req, ok :=
message.Data().(*databasev1.GroupRegistryServiceDeleteRequest)
+ if !ok {
+ return bus.NewMessage(message.ID(), common.NewError("invalid
data type for drop group liaison request"))
+ }
+ if dropErr := l.s.DropGroup(ctx, req.Group); dropErr != nil {
+ return bus.NewMessage(message.ID(), common.NewError("failed to
drop group data on liaison: %v", dropErr))
+ }
+ return bus.NewMessage(message.ID(), req)
+}
diff --git a/banyand/measure/svc_standalone.go
b/banyand/measure/svc_standalone.go
index 97e65e9b7..ebb4f9f7c 100644
--- a/banyand/measure/svc_standalone.go
+++ b/banyand/measure/svc_standalone.go
@@ -98,6 +98,10 @@ func (s *standalone) LoadGroup(name string)
(resourceSchema.Group, bool) {
return s.schemaRepo.LoadGroup(name)
}
+func (s *standalone) DropGroup(_ context.Context, groupName string) error {
+ return s.schemaRepo.DropGroup(groupName)
+}
+
func (s *standalone) GetRemovalSegmentsTimeRange(group string)
*timestamp.TimeRange {
return s.schemaRepo.GetRemovalSegmentsTimeRange(group)
}
@@ -274,6 +278,7 @@ func (s *standalone) PreRun(ctx context.Context) error {
if metaSvc, ok := s.metadata.(metadata.Service); ok {
metaSvc.RegisterDataCollector(commonv1.Catalog_CATALOG_MEASURE,
s.schemaRepo)
metaSvc.RegisterLiaisonCollector(commonv1.Catalog_CATALOG_MEASURE, s)
+
metaSvc.RegisterGroupDropHandler(commonv1.Catalog_CATALOG_MEASURE, s)
}
s.cm = newCacheMetrics(s.omr)
diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go
index 2cbaa503c..0608e19ce 100644
--- a/banyand/metadata/client.go
+++ b/banyand/metadata/client.go
@@ -669,6 +669,10 @@ func (s *clientService) CollectLiaisonInfo(ctx
context.Context, group string) ([
return s.infoCollectorRegistry.CollectLiaisonInfo(ctx, group)
}
+func (s *clientService) DropGroup(ctx context.Context, catalog
commonv1.Catalog, group string) error {
+ return s.infoCollectorRegistry.DropGroup(ctx, catalog, group)
+}
+
func (s *clientService) RegisterDataCollector(catalog commonv1.Catalog,
collector schema.DataInfoCollector) {
s.infoCollectorRegistry.RegisterDataCollector(catalog, collector)
}
@@ -677,6 +681,10 @@ func (s *clientService) RegisterLiaisonCollector(catalog
commonv1.Catalog, colle
s.infoCollectorRegistry.RegisterLiaisonCollector(catalog, collector)
}
+func (s *clientService) RegisterGroupDropHandler(catalog commonv1.Catalog,
handler schema.GroupDropHandler) {
+ s.infoCollectorRegistry.RegisterGroupDropHandler(catalog, handler)
+}
+
func (s *clientService) SetDataBroadcaster(broadcaster bus.Broadcaster) {
s.dataBroadcaster = broadcaster
}
diff --git a/banyand/metadata/metadata.go b/banyand/metadata/metadata.go
index 2e21ffbc8..ac5b6508c 100644
--- a/banyand/metadata/metadata.go
+++ b/banyand/metadata/metadata.go
@@ -55,6 +55,7 @@ type Repo interface {
PropertyRegistry() schema.Property
CollectDataInfo(context.Context, string) ([]*databasev1.DataInfo, error)
CollectLiaisonInfo(context.Context, string) ([]*databasev1.LiaisonInfo,
error)
+ DropGroup(ctx context.Context, catalog commonv1.Catalog, group string)
error
}
// Service is the metadata repository.
@@ -69,4 +70,5 @@ type Service interface {
SetLiaisonBroadcaster(broadcaster bus.Broadcaster)
RegisterDataCollector(catalog commonv1.Catalog, collector
schema.DataInfoCollector)
RegisterLiaisonCollector(catalog commonv1.Catalog, collector
schema.LiaisonInfoCollector)
+ RegisterGroupDropHandler(catalog commonv1.Catalog, handler
schema.GroupDropHandler)
}
diff --git a/banyand/metadata/schema/collector.go
b/banyand/metadata/schema/collector.go
index 6f348d91b..d8ea0e1e0 100644
--- a/banyand/metadata/schema/collector.go
+++ b/banyand/metadata/schema/collector.go
@@ -23,6 +23,8 @@ import (
"sync"
"time"
+ "go.uber.org/multierr"
+
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/api/data"
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
@@ -36,11 +38,17 @@ type GroupGetter interface {
GetGroup(ctx context.Context, group string) (*commonv1.Group, error)
}
+// GroupDropHandler handles dropping group physical data files on the local
node.
+type GroupDropHandler interface {
+ DropGroup(ctx context.Context, group string) error
+}
+
// InfoCollectorRegistry manages data and liaison info collectors.
type InfoCollectorRegistry struct {
groupGetter GroupGetter
dataCollectors map[commonv1.Catalog]DataInfoCollector
liaisonCollectors map[commonv1.Catalog]LiaisonInfoCollector
+ dropHandlers map[commonv1.Catalog]GroupDropHandler
dataBroadcaster bus.Broadcaster
liaisonBroadcaster bus.Broadcaster
l *logger.Logger
@@ -53,6 +61,7 @@ func NewInfoCollectorRegistry(l *logger.Logger, groupGetter
GroupGetter) *InfoCo
groupGetter: groupGetter,
dataCollectors: make(map[commonv1.Catalog]DataInfoCollector),
liaisonCollectors:
make(map[commonv1.Catalog]LiaisonInfoCollector),
+ dropHandlers: make(map[commonv1.Catalog]GroupDropHandler),
l: l,
}
}
@@ -199,6 +208,72 @@ func (icr *InfoCollectorRegistry)
collectLiaisonInfoLocal(ctx context.Context, c
return nil, nil
}
+// DropGroup drops the group data files on all nodes.
+func (icr *InfoCollectorRegistry) DropGroup(ctx context.Context, catalog
commonv1.Catalog, group string) error {
+ if localErr := icr.dropGroupLocal(ctx, catalog, group); localErr != nil
{
+ return fmt.Errorf("failed to drop group locally: %w", localErr)
+ }
+ icr.mux.RLock()
+ dataBroadcaster := icr.dataBroadcaster
+ liaisonBroadcaster := icr.liaisonBroadcaster
+ icr.mux.RUnlock()
+
+ var topic bus.Topic
+ switch catalog {
+ case commonv1.Catalog_CATALOG_MEASURE:
+ topic = data.TopicMeasureDropGroup
+ case commonv1.Catalog_CATALOG_STREAM:
+ topic = data.TopicStreamDropGroup
+ case commonv1.Catalog_CATALOG_TRACE:
+ topic = data.TopicTraceDropGroup
+ default:
+ return nil
+ }
+
+ var errs []error
+ if dataBroadcaster != nil {
+ if broadcastErr := icr.broadcastDropGroup(dataBroadcaster,
topic, group); broadcastErr != nil {
+ errs = append(errs, fmt.Errorf("data nodes: %w",
broadcastErr))
+ }
+ }
+ if liaisonBroadcaster != nil {
+ if broadcastErr := icr.broadcastDropGroup(liaisonBroadcaster,
topic, group); broadcastErr != nil {
+ errs = append(errs, fmt.Errorf("liaison nodes: %w",
broadcastErr))
+ }
+ }
+ return multierr.Combine(errs...)
+}
+
+func (icr *InfoCollectorRegistry) broadcastDropGroup(broadcaster
bus.Broadcaster, topic bus.Topic, group string) error {
+ message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()),
&databasev1.GroupRegistryServiceDeleteRequest{Group: group})
+ futures, broadcastErr := broadcaster.Broadcast(30*time.Second, topic,
message)
+ if broadcastErr != nil {
+ return fmt.Errorf("failed to broadcast drop group request: %w",
broadcastErr)
+ }
+ var errs []error
+ for _, future := range futures {
+ msg, getErr := future.Get()
+ if getErr != nil {
+ errs = append(errs, getErr)
+ continue
+ }
+ if errMsg, ok := msg.Data().(*common.Error); ok {
+ errs = append(errs, fmt.Errorf("node reported error
dropping group: %s", errMsg.Error()))
+ }
+ }
+ return multierr.Combine(errs...)
+}
+
+func (icr *InfoCollectorRegistry) dropGroupLocal(ctx context.Context, catalog
commonv1.Catalog, group string) error {
+ icr.mux.RLock()
+ handler, hasHandler := icr.dropHandlers[catalog]
+ icr.mux.RUnlock()
+ if hasHandler && handler != nil {
+ return handler.DropGroup(ctx, group)
+ }
+ return nil
+}
+
// RegisterDataCollector registers a data info collector for a specific
catalog.
func (icr *InfoCollectorRegistry) RegisterDataCollector(catalog
commonv1.Catalog, collector DataInfoCollector) {
icr.mux.Lock()
@@ -213,6 +288,13 @@ func (icr *InfoCollectorRegistry)
RegisterLiaisonCollector(catalog commonv1.Cata
icr.liaisonCollectors[catalog] = collector
}
+// RegisterGroupDropHandler registers a group drop handler for a specific
catalog.
+func (icr *InfoCollectorRegistry) RegisterGroupDropHandler(catalog
commonv1.Catalog, handler GroupDropHandler) {
+ icr.mux.Lock()
+ defer icr.mux.Unlock()
+ icr.dropHandlers[catalog] = handler
+}
+
// SetDataBroadcaster sets the broadcaster for data info collection.
func (icr *InfoCollectorRegistry) SetDataBroadcaster(broadcaster
bus.Broadcaster) {
icr.mux.Lock()
diff --git a/banyand/property/db/db.go b/banyand/property/db/db.go
index 7b3c2b88f..8c535b4d3 100644
--- a/banyand/property/db/db.go
+++ b/banyand/property/db/db.go
@@ -21,6 +21,7 @@ package db
import (
"context"
"errors"
+ "fmt"
"path"
"path/filepath"
"strconv"
@@ -75,6 +76,8 @@ type Database interface {
Repair(ctx context.Context, id []byte, shardID uint64, property
*propertyv1.Property, deleteTime int64) error
// TakeSnapShot takes a snapshot of the database.
TakeSnapShot(ctx context.Context, sn string) *databasev1.Snapshot
+ // Drop closes and removes all shards for the given group and deletes
the group directory.
+ Drop(groupName string) error
// RegisterGossip registers the repair scheduler's gossip services with
the given messenger.
RegisterGossip(messenger gossip.Messenger)
// Close closes the database.
@@ -380,6 +383,31 @@ func (db *database) getShard(group string, id
common.ShardID) (*shard, bool) {
return nil, false
}
+// Drop closes and removes all shards for the given group and deletes the
group directory.
+func (db *database) Drop(groupName string) (err error) {
+ value, ok := db.groups.LoadAndDelete(groupName)
+ if !ok {
+ return nil
+ }
+ gs := value.(*groupShards)
+ sLst := gs.shards.Load()
+ if sLst != nil {
+ for _, s := range *sLst {
+ multierr.AppendInto(&err, s.close())
+ }
+ if err != nil {
+ return err
+ }
+ }
+ defer func() {
+ if r := recover(); r != nil {
+ err = fmt.Errorf("failed to remove group directory %s:
%v", gs.location, r)
+ }
+ }()
+ db.lfs.MustRMAll(gs.location)
+ return nil
+}
+
// RegisterGossip registers the repair scheduler's gossip services with the
given messenger.
func (db *database) RegisterGossip(messenger gossip.Messenger) {
if db.repairScheduler == nil {
diff --git a/banyand/property/service.go b/banyand/property/service.go
index c7bded2c3..5309245ec 100644
--- a/banyand/property/service.go
+++ b/banyand/property/service.go
@@ -31,9 +31,11 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/api/data"
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/banyand/internal/storage"
"github.com/apache/skywalking-banyandb/banyand/metadata"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/banyand/observability"
obsservice
"github.com/apache/skywalking-banyandb/banyand/observability/services"
"github.com/apache/skywalking-banyandb/banyand/property/db"
@@ -63,19 +65,19 @@ type service struct {
omr observability.MetricsRegistry
lfs fs.FileSystem
pm protector.Memory
- closer *run.Closer
db db.Database
+ closer *run.Closer
l *logger.Logger
- nodeID string
repairScheduler *repairScheduler
- root string
snapshotDir string
+ root string
repairDir string
repairBuildTreeCron string
repairTriggerCron string
- flushTimeout time.Duration
- expireTimeout time.Duration
+ nodeID string
repairQuickBuildTreeTime time.Duration
+ expireTimeout time.Duration
+ flushTimeout time.Duration
repairTreeSlotCount int
maxDiskUsagePercent int
maxFileSnapshotNum int
@@ -201,6 +203,7 @@ func (s *service) PreRun(ctx context.Context) error {
}
s.repairScheduler = scheduler
}
+ s.metadata.RegisterHandler("property", schema.KindGroup,
&propertyGroupEventHandler{svc: s})
return multierr.Combine(
s.pipeline.Subscribe(data.TopicPropertyUpdate,
&updateListener{s: s, l: s.l, path: path, maxDiskUsagePercent:
s.maxDiskUsagePercent}),
s.pipeline.Subscribe(data.TopicPropertyDelete,
&deleteListener{s: s}),
@@ -210,6 +213,30 @@ func (s *service) PreRun(ctx context.Context) error {
)
}
+type propertyGroupEventHandler struct {
+ svc *service
+}
+
+func (h *propertyGroupEventHandler) OnInit([]schema.Kind) (bool, []int64) {
+ return false, nil
+}
+
+func (h *propertyGroupEventHandler) OnAddOrUpdate(_ schema.Metadata) {}
+
+func (h *propertyGroupEventHandler) OnDelete(md schema.Metadata) {
+ if md.Kind != schema.KindGroup {
+ return
+ }
+ group := md.Spec.(*commonv1.Group)
+ if group.Catalog != commonv1.Catalog_CATALOG_PROPERTY {
+ return
+ }
+ groupName := group.Metadata.GetName()
+ if dropErr := h.svc.db.Drop(groupName); dropErr != nil {
+ h.svc.l.Error().Err(dropErr).Str("group",
groupName).Msg("failed to drop group")
+ }
+}
+
func (s *service) Serve() run.StopNotify {
if s.gossipMessenger != nil {
s.gossipMessenger.Serve(s.closer)
diff --git a/banyand/stream/svc_liaison.go b/banyand/stream/svc_liaison.go
index 49fe0f286..78da92c9c 100644
--- a/banyand/stream/svc_liaison.go
+++ b/banyand/stream/svc_liaison.go
@@ -76,6 +76,10 @@ func (s *liaison) LoadGroup(name string)
(resourceSchema.Group, bool) {
return s.schemaRepo.LoadGroup(name)
}
+func (s *liaison) DropGroup(_ context.Context, groupName string) error {
+ return s.schemaRepo.DropGroup(groupName)
+}
+
func (s *liaison) GetRemovalSegmentsTimeRange(group string)
*timestamp.TimeRange {
return s.schemaRepo.GetRemovalSegmentsTimeRange(group)
}
@@ -196,12 +200,16 @@ func (s *liaison) PreRun(ctx context.Context) error {
if metaSvc, ok := s.metadata.(metadata.Service); ok {
metaSvc.RegisterLiaisonCollector(commonv1.Catalog_CATALOG_STREAM, s)
+
metaSvc.RegisterGroupDropHandler(commonv1.Catalog_CATALOG_STREAM, s)
}
collectLiaisonInfoListener := &collectLiaisonInfoListener{s: s}
if subscribeErr :=
s.pipeline.Subscribe(data.TopicStreamCollectLiaisonInfo,
collectLiaisonInfoListener); subscribeErr != nil {
return fmt.Errorf("failed to subscribe to collect liaison info
topic: %w", subscribeErr)
}
+ if subscribeErr := s.pipeline.Subscribe(data.TopicStreamDropGroup,
&dropGroupLiaisonListener{s: s}); subscribeErr != nil {
+ return fmt.Errorf("failed to subscribe to drop group topic:
%w", subscribeErr)
+ }
return s.pipeline.Subscribe(data.TopicStreamWrite, s.writeListener)
}
@@ -246,3 +254,19 @@ func (l *collectLiaisonInfoListener) Rev(ctx
context.Context, message bus.Messag
}
return bus.NewMessage(message.ID(), liaisonInfo)
}
+
+type dropGroupLiaisonListener struct {
+ *bus.UnImplementedHealthyListener
+ s *liaison
+}
+
+func (l *dropGroupLiaisonListener) Rev(ctx context.Context, message
bus.Message) bus.Message {
+ req, ok :=
message.Data().(*databasev1.GroupRegistryServiceDeleteRequest)
+ if !ok {
+ return bus.NewMessage(message.ID(), common.NewError("invalid
data type for drop group liaison request"))
+ }
+ if dropErr := l.s.DropGroup(ctx, req.Group); dropErr != nil {
+ return bus.NewMessage(message.ID(), common.NewError("failed to
drop group data on liaison: %v", dropErr))
+ }
+ return bus.NewMessage(message.ID(), req)
+}
diff --git a/banyand/stream/svc_standalone.go b/banyand/stream/svc_standalone.go
index 7174ae6ab..9dab8a867 100644
--- a/banyand/stream/svc_standalone.go
+++ b/banyand/stream/svc_standalone.go
@@ -97,6 +97,10 @@ func (s *standalone) LoadGroup(name string)
(resourceSchema.Group, bool) {
return s.schemaRepo.LoadGroup(name)
}
+func (s *standalone) DropGroup(_ context.Context, groupName string) error {
+ return s.schemaRepo.DropGroup(groupName)
+}
+
func (s *standalone) GetRemovalSegmentsTimeRange(group string)
*timestamp.TimeRange {
return s.schemaRepo.GetRemovalSegmentsTimeRange(group)
}
@@ -255,6 +259,7 @@ func (s *standalone) PreRun(ctx context.Context) error {
if metaSvc, ok := s.metadata.(metadata.Service); ok {
metaSvc.RegisterDataCollector(commonv1.Catalog_CATALOG_STREAM,
&s.schemaRepo)
metaSvc.RegisterLiaisonCollector(commonv1.Catalog_CATALOG_STREAM, s)
+
metaSvc.RegisterGroupDropHandler(commonv1.Catalog_CATALOG_STREAM, s)
}
if s.pipeline == nil {
return nil
@@ -264,6 +269,9 @@ func (s *standalone) PreRun(ctx context.Context) error {
if subscribeErr :=
s.pipeline.Subscribe(data.TopicStreamCollectDataInfo, collectDataInfoListener);
subscribeErr != nil {
return fmt.Errorf("failed to subscribe to collect data info
topic: %w", subscribeErr)
}
+ if dropGroupErr := s.pipeline.Subscribe(data.TopicStreamDropGroup,
&dropGroupDataListener{s: s}); dropGroupErr != nil {
+ return fmt.Errorf("failed to subscribe to drop group topic:
%w", dropGroupErr)
+ }
s.localPipeline = queue.Local()
if err = s.pipeline.Subscribe(data.TopicSnapshot, &snapshotListener{s:
s}); err != nil {
@@ -400,3 +408,19 @@ func (l *collectDataInfoListener) Rev(ctx context.Context,
message bus.Message)
}
return bus.NewMessage(message.ID(), dataInfo)
}
+
+type dropGroupDataListener struct {
+ *bus.UnImplementedHealthyListener
+ s *standalone
+}
+
+func (l *dropGroupDataListener) Rev(ctx context.Context, message bus.Message)
bus.Message {
+ req, ok :=
message.Data().(*databasev1.GroupRegistryServiceDeleteRequest)
+ if !ok {
+ return bus.NewMessage(message.ID(), common.NewError("invalid
data type for drop group request"))
+ }
+ if dropErr := l.s.DropGroup(ctx, req.Group); dropErr != nil {
+ return bus.NewMessage(message.ID(), common.NewError("failed to
drop group data: %v", dropErr))
+ }
+ return bus.NewMessage(message.ID(), req)
+}
diff --git a/banyand/trace/handoff_controller.go
b/banyand/trace/handoff_controller.go
index 18f8d66c2..bc45154f3 100644
--- a/banyand/trace/handoff_controller.go
+++ b/banyand/trace/handoff_controller.go
@@ -620,6 +620,52 @@ func (hc *handoffController) filterNodesForShard(nodes
[]string, group string, s
return filtered
}
+func (hc *handoffController) deletePartsByGroup(group string) {
+ hc.mu.Lock()
+ defer hc.mu.Unlock()
+
+ var totalRemoved int
+ for nodeAddr, nodeQueue := range hc.nodeQueues {
+ pending, listErr := nodeQueue.listPending()
+ if listErr != nil {
+ hc.l.Warn().Err(listErr).Str("node",
nodeAddr).Msg("failed to list pending parts for group cleanup")
+ continue
+ }
+ for _, ptp := range pending {
+ meta, metaErr := nodeQueue.getMetadata(ptp.PartID,
ptp.PartType)
+ if metaErr != nil {
+ hc.l.Warn().Err(metaErr).
+ Str("node", nodeAddr).
+ Uint64("partID", ptp.PartID).
+ Str("partType", ptp.PartType).
+ Msg("failed to read metadata during
group cleanup")
+ continue
+ }
+ if meta.Group != group {
+ continue
+ }
+ if completeErr := nodeQueue.complete(ptp.PartID,
ptp.PartType); completeErr != nil {
+ hc.l.Warn().Err(completeErr).
+ Str("node", nodeAddr).
+ Uint64("partID", ptp.PartID).
+ Str("partType", ptp.PartType).
+ Msg("failed to remove part during group
cleanup")
+ continue
+ }
+ if meta.PartSizeBytes > 0 {
+ hc.updateTotalSize(-int64(meta.PartSizeBytes))
+ }
+ totalRemoved++
+ }
+ }
+ if totalRemoved > 0 {
+ hc.l.Info().
+ Str("group", group).
+ Int("removedParts", totalRemoved).
+ Msg("cleaned up handoff parts for deleted group")
+ }
+}
+
// close closes the handoff controller.
func (hc *handoffController) close() error {
// Stop the monitor
diff --git a/banyand/trace/metadata.go b/banyand/trace/metadata.go
index 26f2e0ea3..7377c4201 100644
--- a/banyand/trace/metadata.go
+++ b/banyand/trace/metadata.go
@@ -58,11 +58,12 @@ type SchemaService interface {
type schemaRepo struct {
resourceSchema.Repository
- l *logger.Logger
- metadata metadata.Repo
- path string
- nodeID string
- role databasev1.Role
+ onGroupDelete func(groupName string)
+ l *logger.Logger
+ metadata metadata.Repo
+ path string
+ nodeID string
+ role databasev1.Role
}
func newSchemaRepo(path string, svc *standalone, nodeLabels map[string]string,
nodeID string) schemaRepo {
@@ -96,6 +97,9 @@ func newLiaisonSchemaRepo(path string, svc *liaison,
traceDataNodeRegistry grpc.
resourceSchema.NewMetrics(svc.omr.With(metadataScope)),
),
}
+ if svc.handoffCtrl != nil {
+ sr.onGroupDelete = svc.handoffCtrl.deletePartsByGroup
+ }
sr.start()
return sr
}
@@ -200,6 +204,9 @@ func (sr *schemaRepo) OnDelete(metadata schema.Metadata) {
if g.Catalog != commonv1.Catalog_CATALOG_TRACE {
return
}
+ if sr.onGroupDelete != nil {
+ sr.onGroupDelete(g.Metadata.Name)
+ }
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
Typ: resourceSchema.EventDelete,
Kind: resourceSchema.EventKindGroup,
diff --git a/banyand/trace/svc_liaison.go b/banyand/trace/svc_liaison.go
index 9ea23d179..5532ce011 100644
--- a/banyand/trace/svc_liaison.go
+++ b/banyand/trace/svc_liaison.go
@@ -104,6 +104,22 @@ func (cl *collectLiaisonInfoListener) Rev(ctx
context.Context, message bus.Messa
return bus.NewMessage(message.ID(), liaisonInfo)
}
+type dropGroupLiaisonListener struct {
+ *bus.UnImplementedHealthyListener
+ l *liaison
+}
+
+func (dl *dropGroupLiaisonListener) Rev(ctx context.Context, message
bus.Message) bus.Message {
+ req, ok :=
message.Data().(*databasev1.GroupRegistryServiceDeleteRequest)
+ if !ok {
+ return bus.NewMessage(message.ID(), common.NewError("invalid
data type for drop group liaison request"))
+ }
+ if dropErr := dl.l.DropGroup(ctx, req.Group); dropErr != nil {
+ return bus.NewMessage(message.ID(), common.NewError("failed to
drop group data on liaison: %v", dropErr))
+ }
+ return bus.NewMessage(message.ID(), req)
+}
+
// LiaisonService returns a new liaison service (deprecated - use NewLiaison).
func LiaisonService(_ context.Context) (Service, error) {
return &liaison{}, nil
@@ -280,12 +296,16 @@ func (l *liaison) PreRun(ctx context.Context) error {
if metaSvc, ok := l.metadata.(metadata.Service); ok {
metaSvc.RegisterLiaisonCollector(commonv1.Catalog_CATALOG_TRACE, l)
+
metaSvc.RegisterGroupDropHandler(commonv1.Catalog_CATALOG_TRACE, l)
}
collectLiaisonInfoListener := &collectLiaisonInfoListener{l: l}
if subscribeErr :=
l.pipeline.Subscribe(data.TopicTraceCollectLiaisonInfo,
collectLiaisonInfoListener); subscribeErr != nil {
return fmt.Errorf("failed to subscribe to collect liaison info
topic: %w", subscribeErr)
}
+ if subscribeErr := l.pipeline.Subscribe(data.TopicTraceDropGroup,
&dropGroupLiaisonListener{l: l}); subscribeErr != nil {
+ return fmt.Errorf("failed to subscribe to drop group topic:
%w", subscribeErr)
+ }
return l.pipeline.Subscribe(data.TopicTraceWrite, l.writeListener)
}
@@ -310,6 +330,10 @@ func (l *liaison) LoadGroup(name string)
(resourceSchema.Group, bool) {
return l.schemaRepo.LoadGroup(name)
}
+func (l *liaison) DropGroup(_ context.Context, groupName string) error {
+ return l.schemaRepo.DropGroup(groupName)
+}
+
func (l *liaison) Trace(metadata *commonv1.Metadata) (Trace, error) {
sm, ok := l.schemaRepo.Trace(metadata)
if !ok {
diff --git a/banyand/trace/svc_standalone.go b/banyand/trace/svc_standalone.go
index 0e42b5c5b..77dd2f873 100644
--- a/banyand/trace/svc_standalone.go
+++ b/banyand/trace/svc_standalone.go
@@ -162,11 +162,16 @@ func (s *standalone) PreRun(ctx context.Context) error {
if metaSvc, ok := s.metadata.(metadata.Service); ok {
metaSvc.RegisterDataCollector(commonv1.Catalog_CATALOG_TRACE,
&s.schemaRepo)
metaSvc.RegisterLiaisonCollector(commonv1.Catalog_CATALOG_TRACE, s)
+
metaSvc.RegisterGroupDropHandler(commonv1.Catalog_CATALOG_TRACE, s)
}
subErr := s.pipeline.Subscribe(data.TopicTraceCollectDataInfo,
&collectDataInfoListener{s: s})
if subErr != nil {
return fmt.Errorf("failed to subscribe to
TopicTraceCollectDataInfo: %w", subErr)
}
+ dropGroupErr := s.pipeline.Subscribe(data.TopicTraceDropGroup,
&dropGroupDataListener{s: s})
+ if dropGroupErr != nil {
+ return fmt.Errorf("failed to subscribe to TopicTraceDropGroup:
%w", dropGroupErr)
+ }
// Initialize snapshot directory
s.snapshotDir = filepath.Join(path, "snapshots")
@@ -230,6 +235,10 @@ func (s *standalone) LoadGroup(name string)
(resourceSchema.Group, bool) {
return s.schemaRepo.LoadGroup(name)
}
+func (s *standalone) DropGroup(_ context.Context, groupName string) error {
+ return s.schemaRepo.DropGroup(groupName)
+}
+
func (s *standalone) Trace(metadata *commonv1.Metadata) (Trace, error) {
sm, ok := s.schemaRepo.Trace(metadata)
if !ok {
@@ -563,6 +572,22 @@ func (l *collectDataInfoListener) Rev(ctx context.Context,
message bus.Message)
return bus.NewMessage(message.ID(), dataInfo)
}
+type dropGroupDataListener struct {
+ *bus.UnImplementedHealthyListener
+ s *standalone
+}
+
+func (l *dropGroupDataListener) Rev(ctx context.Context, message bus.Message)
bus.Message {
+ req, ok :=
message.Data().(*databasev1.GroupRegistryServiceDeleteRequest)
+ if !ok {
+ return bus.NewMessage(message.ID(), common.NewError("invalid
data type for drop group request"))
+ }
+ if dropErr := l.s.DropGroup(ctx, req.Group); dropErr != nil {
+ return bus.NewMessage(message.ID(), common.NewError("failed to
drop group data: %v", dropErr))
+ }
+ return bus.NewMessage(message.ID(), req)
+}
+
// NewService returns a new service.
func NewService(metadata metadata.Repo, pipeline queue.Server, omr
observability.MetricsRegistry, pm protector.Memory) (Service, error) {
return &standalone{
diff --git a/bydbctl/internal/cmd/group_test.go
b/bydbctl/internal/cmd/group_test.go
index 1b5cde6be..ceac5023e 100644
--- a/bydbctl/internal/cmd/group_test.go
+++ b/bydbctl/internal/cmd/group_test.go
@@ -96,12 +96,13 @@ resource_opts:
})
It("delete group", func() {
- rootCmd.SetArgs([]string{"group", "delete", "-g", "group1"})
- out := capturer.CaptureStdout(func() {
- err := rootCmd.Execute()
- Expect(err).NotTo(HaveOccurred())
- })
- Expect(out).To(ContainSubstring("group group1 is deleted"))
+ Eventually(func(g Gomega) {
+ rootCmd.SetArgs([]string{"group", "delete", "-g",
"group1"})
+ out := capturer.CaptureStdout(func() {
+
g.Expect(rootCmd.Execute()).NotTo(HaveOccurred())
+ })
+ g.Expect(out).To(ContainSubstring("group group1 is
deleted"))
+ }).Should(Succeed())
})
It("list group", func() {
@@ -132,7 +133,7 @@ resource_opts:
})
resp := new(databasev1.GroupRegistryServiceListResponse)
helpers.UnmarshalYAML([]byte(out), resp)
- Expect(resp.Group).To(HaveLen(2)) // group1, group2 [internal
group]
+ Expect(resp.Group).To(HaveLen(3)) // group1, group2,
_deletion_task (internal group)
})
AfterEach(func() {
diff --git a/docs/api-reference.md b/docs/api-reference.md
index a4f79d4a1..d75e4fd19 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -2836,6 +2836,7 @@ GroupDeletionTask represents the status of a group
deletion operation.
| deleted_data_size_bytes | [int64](#int64) | | deleted_data_size_bytes is
the size of data that has been deleted in bytes. |
| message | [string](#string) | | message provides additional information
about the task status. |
| created_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | |
created_at is the timestamp when the task was created. |
+| updated_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | |
updated_at is the timestamp when the task was last updated. |
@@ -2910,6 +2911,7 @@ GroupRegistryServiceDeleteRequest is the request for
deleting a group.
| group | [string](#string) | | group is the name of the group to delete. |
| dry_run | [bool](#bool) | | dry_run indicates whether to perform a dry run
without actually deleting data. When true, returns what would be deleted
without making changes. |
| force | [bool](#bool) | | force indicates whether to force delete the group
even if it contains data. When false, deletion will fail if the group is not
empty. |
+| data_only | [bool](#bool) | | data_only indicates whether to delete only
data files without removing metadata. When true, metadata are preserved. |
@@ -2924,8 +2926,7 @@ GroupRegistryServiceDeleteResponse is the response for
deleting a group.
| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
-| deleted | [bool](#bool) | | deleted indicates whether the group was
deleted. |
-| task_id | [string](#string) | | task_id is the ID of the background
deletion task. |
+| schema_info | [SchemaInfo](#banyandb-database-v1-SchemaInfo) | |
schema_info contains the schema resources that would be deleted (populated in
dry-run mode). |
diff --git a/pkg/index/inverted/inverted_series.go
b/pkg/index/inverted/inverted_series.go
index e7c4d013c..e14426ae7 100644
--- a/pkg/index/inverted/inverted_series.go
+++ b/pkg/index/inverted/inverted_series.go
@@ -44,6 +44,9 @@ func (s *store) InsertSeriesBatch(batch index.Batch) error {
return nil
}
if !s.closer.AddRunning() {
+ if batch.PersistentCallback != nil {
+ batch.PersistentCallback(errors.New("store is closed"))
+ }
return nil
}
defer s.closer.Done()
@@ -64,6 +67,9 @@ func (s *store) UpdateSeriesBatch(batch index.Batch) error {
return nil
}
if !s.closer.AddRunning() {
+ if batch.PersistentCallback != nil {
+ batch.PersistentCallback(errors.New("store is closed"))
+ }
return nil
}
defer s.closer.Done()
diff --git a/pkg/schema/cache.go b/pkg/schema/cache.go
index ac3937ec9..5d2c28400 100644
--- a/pkg/schema/cache.go
+++ b/pkg/schema/cache.go
@@ -27,7 +27,6 @@ import (
"time"
"github.com/pkg/errors"
- "github.com/rs/zerolog/log"
"go.uber.org/multierr"
"google.golang.org/protobuf/proto"
@@ -296,11 +295,18 @@ func (sr *schemaRepo) createGroup(name string) (g *group)
{
func (sr *schemaRepo) deleteGroup(groupMeta *commonv1.Metadata) error {
name := groupMeta.GetName()
g, loaded := sr.groupMap.LoadAndDelete(name)
- log.Info().Str("group", name).Bool("loaded", loaded).Msg("deleting
group")
if !loaded {
return nil
}
- return g.(*group).close()
+ grp := g.(*group)
+ return grp.close()
+}
+
+func (sr *schemaRepo) DropGroup(name string) error {
+ if g, ok := sr.groupMap.Load(name); ok {
+ return g.(*group).drop()
+ }
+ return nil
}
func (sr *schemaRepo) getGroup(name string) (*group, bool) {
@@ -570,3 +576,14 @@ func (g *group) close() (err error) {
}
return multierr.Append(err, g.SupplyTSDB().Close())
}
+
+func (g *group) drop() error {
+ if !g.isInit() || g.isPortable() {
+ return nil
+ }
+ db := g.db.Load()
+ if db == nil {
+ return nil
+ }
+ return db.(DB).Drop()
+}
diff --git a/pkg/schema/schema.go b/pkg/schema/schema.go
index e80a59a4c..e99bcce9c 100644
--- a/pkg/schema/schema.go
+++ b/pkg/schema/schema.go
@@ -97,6 +97,7 @@ type ResourceSupplier interface {
type DB interface {
io.Closer
UpdateOptions(opts *commonv1.ResourceOpts)
+ Drop() error
}
// Repository is the collection of several hierarchies groups by a "Group".
@@ -109,4 +110,5 @@ type Repository interface {
LoadResource(metadata *commonv1.Metadata) (Resource, bool)
Close()
StopCh() <-chan struct{}
+ DropGroup(groupName string) error
}
diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go
index 321ad5f4c..7dcd4412b 100644
--- a/pkg/test/setup/setup.go
+++ b/pkg/test/setup/setup.go
@@ -736,14 +736,7 @@ func DataNodeWithAddrAndDir(config *ClusterConfig, flags
...string) (string, str
}
}
-// LiaisonNode runs a liaison node.
-func LiaisonNode(config *ClusterConfig, flags ...string) (grpcAddr string,
closeFn func()) {
- grpcAddr, _, closeFn = LiaisonNodeWithHTTP(config, flags...)
- return
-}
-
-// LiaisonNodeWithHTTP runs a liaison node with HTTP enabled and returns the
gRPC and HTTP addresses.
-func LiaisonNodeWithHTTP(config *ClusterConfig, flags ...string) (string,
string, func()) {
+func startLiaisonNode(config *ClusterConfig, path string, flags ...string)
(string, string, func()) {
if config == nil {
config = defaultClusterConfig
}
@@ -752,7 +745,6 @@ func LiaisonNodeWithHTTP(config *ClusterConfig, flags
...string) (string, string
grpcAddr := fmt.Sprintf("%s:%d", host, ports[0])
httpAddr := fmt.Sprintf("%s:%d", host, ports[1])
nodeHost := "127.0.0.1"
- path, deferFn, err := test.NewSpace()
logger.Infof("liaison test directory: %s", path)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
isPropertyMode := config.SchemaRegistry.Mode == ModeProperty
@@ -816,6 +808,33 @@ func LiaisonNodeWithHTTP(config *ClusterConfig, flags
...string) (string, string
})
fmt.Println("done")
closeFn()
+ }
+}
+
+// LiaisonNode runs a liaison node.
+func LiaisonNode(config *ClusterConfig, flags ...string) (grpcAddr string,
closeFn func()) {
+ grpcAddr, _, closeFn = LiaisonNodeWithHTTP(config, flags...)
+ return
+}
+
+// LiaisonNodeWithHTTP runs a liaison node with HTTP enabled and returns the
gRPC and HTTP addresses.
+func LiaisonNodeWithHTTP(config *ClusterConfig, flags ...string) (string,
string, func()) {
+ dataDir, deferFn, dirErr := test.NewSpace()
+ gomega.Expect(dirErr).NotTo(gomega.HaveOccurred())
+ grpcAddr, httpAddr, closeFn := startLiaisonNode(config, dataDir,
flags...)
+ return grpcAddr, httpAddr, func() {
+ closeFn()
+ deferFn()
+ }
+}
+
+// LiaisonNodeWithAddrAndDir runs a liaison node and returns the gRPC address,
root data path, and closer.
+func LiaisonNodeWithAddrAndDir(config *ClusterConfig, flags ...string)
(string, string, func()) {
+ dataDir, deferFn, dirErr := test.NewSpace()
+ gomega.Expect(dirErr).NotTo(gomega.HaveOccurred())
+ grpcAddr, _, closeFn := startLiaisonNode(config, dataDir, flags...)
+ return grpcAddr, dataDir, func() {
+ closeFn()
deferFn()
}
}
diff --git a/test/integration/distributed/deletion/deletion_suite_test.go
b/test/integration/distributed/deletion/deletion_suite_test.go
new file mode 100644
index 000000000..8c0015cc6
--- /dev/null
+++ b/test/integration/distributed/deletion/deletion_suite_test.go
@@ -0,0 +1,313 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package integration_deletion_test provides integration tests for group
deletion in distributed mode.
+package integration_deletion_test
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "path/filepath"
+ "strings"
+ "testing"
+ "time"
+
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+ "github.com/onsi/gomega/gleak"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/credentials/insecure"
+ "google.golang.org/grpc/status"
+
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd"
+ "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/pool"
+ "github.com/apache/skywalking-banyandb/pkg/test"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
+ "github.com/apache/skywalking-banyandb/pkg/test/gmatcher"
+ "github.com/apache/skywalking-banyandb/pkg/test/setup"
+)
+
+func TestDeletion(t *testing.T) {
+ RegisterFailHandler(Fail)
+ RunSpecs(t, "Distributed Deletion Suite")
+}
+
+var (
+ deferFunc func()
+ goods []gleak.Goroutine
+ connection *grpc.ClientConn
+ groupClient databasev1.GroupRegistryServiceClient
+ dataNode0Path string
+ dataNode1Path string
+ liaisonNodePath string
+)
+
+var _ = SynchronizedBeforeSuite(func() []byte {
+ Expect(logger.Init(logger.Logging{
+ Env: "dev",
+ Level: flags.LogLevel,
+ })).To(Succeed())
+ pool.EnableStackTracking(true)
+ goods = gleak.Goroutines()
+
+ By("Starting etcd server")
+ ports, err := test.AllocateFreePorts(2)
+ Expect(err).NotTo(HaveOccurred())
+ dir, spaceDef, err := test.NewSpace()
+ Expect(err).NotTo(HaveOccurred())
+ ep := fmt.Sprintf("http://127.0.0.1:%d", ports[0])
+ server, err := embeddedetcd.NewServer(
+ embeddedetcd.ConfigureListener([]string{ep},
[]string{fmt.Sprintf("http://127.0.0.1:%d", ports[1])}),
+ embeddedetcd.RootDir(dir),
+ embeddedetcd.AutoCompactionMode("periodic"),
+ embeddedetcd.AutoCompactionRetention("1h"),
+ embeddedetcd.QuotaBackendBytes(2*1024*1024*1024),
+ )
+ Expect(err).ShouldNot(HaveOccurred())
+ <-server.ReadyNotify()
+
+ clusterConfig := setup.EtcdClusterConfig(ep)
+ By("Starting data node 0")
+ _, dn0Path, closeDataNode0 :=
setup.DataNodeWithAddrAndDir(clusterConfig)
+ By("Starting data node 1")
+ _, dn1Path, closeDataNode1 :=
setup.DataNodeWithAddrAndDir(clusterConfig)
+ By("Starting liaison node")
+ liaisonAddr, liaisonPath, closerLiaisonNode :=
setup.LiaisonNodeWithAddrAndDir(clusterConfig)
+
+ deferFunc = func() {
+ closerLiaisonNode()
+ closeDataNode0()
+ closeDataNode1()
+ _ = server.Close()
+ <-server.StopNotify()
+ spaceDef()
+ }
+
+ return []byte(liaisonAddr + "," + dn0Path + "," + dn1Path + "," +
liaisonPath)
+}, func(address []byte) {
+ parts := strings.SplitN(string(address), ",", 4)
+ liaisonGrpcAddr := parts[0]
+ dataNode0Path = parts[1]
+ dataNode1Path = parts[2]
+ liaisonNodePath = parts[3]
+
+ var connErr error
+ connection, connErr = grpchelper.Conn(liaisonGrpcAddr, 10*time.Second,
+ grpc.WithTransportCredentials(insecure.NewCredentials()))
+ Expect(connErr).NotTo(HaveOccurred())
+
+ groupClient = databasev1.NewGroupRegistryServiceClient(connection)
+})
+
+var _ = SynchronizedAfterSuite(func() {
+ if connection != nil {
+ Expect(connection.Close()).To(Succeed())
+ }
+}, func() {})
+
+var _ = ReportAfterSuite("Distributed Deletion Suite", func(report Report) {
+ if report.SuiteSucceeded {
+ if deferFunc != nil {
+ deferFunc()
+ }
+ Eventually(gleak.Goroutines,
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+ Eventually(pool.AllRefsCount,
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
+ }
+})
+
+var _ = Describe("GroupDeletion", func() {
+ It("returns NotFound when deleting a nonexistent group", func() {
+ _, err := groupClient.Delete(context.TODO(),
&databasev1.GroupRegistryServiceDeleteRequest{
+ Group: "nonexistent-group-dist-xyz",
+ })
+ Expect(err).Should(HaveOccurred())
+ errStatus, ok := status.FromError(err)
+ Expect(ok).To(BeTrue())
+ Expect(errStatus.Code()).To(Equal(codes.NotFound))
+ })
+
+ It("can delete an empty group without force flag", func() {
+ const newGroup = "dist-empty-deletion-group"
+ dn0StreamDir := filepath.Join(dataNode0Path, "stream", "data",
newGroup)
+ dn1StreamDir := filepath.Join(dataNode1Path, "stream", "data",
newGroup)
+ liaisonStreamDir := filepath.Join(liaisonNodePath, "stream",
"data", newGroup)
+ By("Creating a new empty group")
+ _, err := groupClient.Create(context.TODO(),
&databasev1.GroupRegistryServiceCreateRequest{
+ Group: &commonv1.Group{
+ Metadata: &commonv1.Metadata{Name: newGroup},
+ Catalog: commonv1.Catalog_CATALOG_STREAM,
+ ResourceOpts: &commonv1.ResourceOpts{
+ ShardNum: 1,
+ SegmentInterval: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 1,
+ },
+ Ttl: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 7,
+ },
+ },
+ },
+ })
+ Expect(err).ShouldNot(HaveOccurred())
+
+ By("Verifying the group exists")
+ getResp, getErr := groupClient.Get(context.TODO(),
&databasev1.GroupRegistryServiceGetRequest{Group: newGroup})
+ Expect(getErr).ShouldNot(HaveOccurred())
+
Expect(getResp.GetGroup().GetMetadata().GetName()).To(Equal(newGroup))
+
+ By("Verifying group data directories exist on all nodes")
+ Eventually(func() bool {
+ _, statErr0 := os.Stat(dn0StreamDir)
+ _, statErr1 := os.Stat(dn1StreamDir)
+ _, statErrL := os.Stat(liaisonStreamDir)
+ return statErr0 == nil && statErr1 == nil && statErrL
== nil
+ }, flags.EventuallyTimeout).Should(BeTrue())
+
+ By("Deleting the empty group without force")
+ Eventually(func() error {
+ _, deleteErr := groupClient.Delete(context.TODO(),
&databasev1.GroupRegistryServiceDeleteRequest{
+ Group: newGroup,
+ })
+ return deleteErr
+ }, flags.EventuallyTimeout).Should(Succeed())
+
+ By("Verifying the group is eventually removed")
+ Eventually(func() codes.Code {
+ _, getErr := groupClient.Get(context.TODO(),
&databasev1.GroupRegistryServiceGetRequest{Group: newGroup})
+ if getErr == nil {
+ return codes.OK
+ }
+ st, _ := status.FromError(getErr)
+ return st.Code()
+ }, flags.EventuallyTimeout).Should(Equal(codes.NotFound))
+
+ By("Verifying group data directories are removed from all
nodes")
+ Eventually(func() bool {
+ _, statErr0 := os.Stat(dn0StreamDir)
+ _, statErr1 := os.Stat(dn1StreamDir)
+ _, statErrL := os.Stat(liaisonStreamDir)
+ return os.IsNotExist(statErr0) &&
os.IsNotExist(statErr1) && os.IsNotExist(statErrL)
+ }, flags.EventuallyTimeout).Should(BeTrue())
+ })
+
+ It("can delete an existing group with force=true", func() {
+ const groupName = "dist-force-deletion-group"
+ dn0MeasureDir := filepath.Join(dataNode0Path, "measure",
"data", groupName)
+ dn1MeasureDir := filepath.Join(dataNode1Path, "measure",
"data", groupName)
+ liaisonMeasureDir := filepath.Join(liaisonNodePath, "measure",
"data", groupName)
+ By("Creating a group with resources")
+ _, err := groupClient.Create(context.TODO(),
&databasev1.GroupRegistryServiceCreateRequest{
+ Group: &commonv1.Group{
+ Metadata: &commonv1.Metadata{Name: groupName},
+ Catalog: commonv1.Catalog_CATALOG_MEASURE,
+ ResourceOpts: &commonv1.ResourceOpts{
+ ShardNum: 1,
+ SegmentInterval: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 1,
+ },
+ Ttl: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 7,
+ },
+ },
+ },
+ })
+ Expect(err).ShouldNot(HaveOccurred())
+
+ By("Verifying group data directories exist on all nodes")
+ Eventually(func() bool {
+ _, statErr0 := os.Stat(dn0MeasureDir)
+ _, statErr1 := os.Stat(dn1MeasureDir)
+ _, statErrL := os.Stat(liaisonMeasureDir)
+ return statErr0 == nil && statErr1 == nil && statErrL
== nil
+ }, flags.EventuallyTimeout).Should(BeTrue())
+
+ By("Deleting group with force=true")
+ _, err = groupClient.Delete(context.TODO(),
&databasev1.GroupRegistryServiceDeleteRequest{
+ Group: groupName,
+ Force: true,
+ })
+ Expect(err).ShouldNot(HaveOccurred())
+
+ By("Verifying the group is eventually removed")
+ Eventually(func() codes.Code {
+ _, getErr := groupClient.Get(context.TODO(),
&databasev1.GroupRegistryServiceGetRequest{Group: groupName})
+ if getErr == nil {
+ return codes.OK
+ }
+ st, _ := status.FromError(getErr)
+ return st.Code()
+ }, flags.EventuallyTimeout).Should(Equal(codes.NotFound))
+
+ By("Verifying group data directories are removed from all
nodes")
+ Eventually(func() bool {
+ _, statErr0 := os.Stat(dn0MeasureDir)
+ _, statErr1 := os.Stat(dn1MeasureDir)
+ _, statErrL := os.Stat(liaisonMeasureDir)
+ return os.IsNotExist(statErr0) &&
os.IsNotExist(statErr1) && os.IsNotExist(statErrL)
+ }, flags.EventuallyTimeout).Should(BeTrue())
+ })
+
+ It("can query deletion task status until completed", func() {
+ const groupName = "dist-deletion-task-group"
+ By("Creating a group to delete")
+ _, err := groupClient.Create(context.TODO(),
&databasev1.GroupRegistryServiceCreateRequest{
+ Group: &commonv1.Group{
+ Metadata: &commonv1.Metadata{Name: groupName},
+ Catalog: commonv1.Catalog_CATALOG_MEASURE,
+ ResourceOpts: &commonv1.ResourceOpts{
+ ShardNum: 1,
+ SegmentInterval: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 1,
+ },
+ Ttl: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 7,
+ },
+ },
+ },
+ })
+ Expect(err).ShouldNot(HaveOccurred())
+
+ By("Initiating group deletion with force=true")
+ _, err = groupClient.Delete(context.TODO(),
&databasev1.GroupRegistryServiceDeleteRequest{
+ Group: groupName,
+ Force: true,
+ })
+ Expect(err).ShouldNot(HaveOccurred())
+
+ By("Waiting for deletion task to reach COMPLETED")
+ Eventually(func() databasev1.GroupDeletionTask_Phase {
+ queryResp, queryErr :=
groupClient.Query(context.TODO(), &databasev1.GroupRegistryServiceQueryRequest{
+ Group: groupName,
+ })
+ if queryErr != nil {
+ return
databasev1.GroupDeletionTask_PHASE_UNSPECIFIED
+ }
+ return queryResp.GetTask().GetCurrentPhase()
+ }, flags.EventuallyTimeout,
100*time.Millisecond).Should(Equal(databasev1.GroupDeletionTask_PHASE_COMPLETED))
+ })
+})
diff --git a/test/integration/distributed/inspect/common.go
b/test/integration/distributed/inspection/common.go
similarity index 99%
rename from test/integration/distributed/inspect/common.go
rename to test/integration/distributed/inspection/common.go
index c016a4993..3307b83c7 100644
--- a/test/integration/distributed/inspect/common.go
+++ b/test/integration/distributed/inspection/common.go
@@ -15,8 +15,8 @@
// specific language governing permissions and limitations
// under the License.
-// Package inspect provides shared test setup for the inspect functionality in
distributed mode.
-package inspect
+// Package inspection provides shared test setup for the inspect functionality
in distributed mode.
+package inspection
import (
"context"
diff --git a/test/integration/distributed/inspect/etcd/suite_test.go
b/test/integration/distributed/inspection/etcd/suite_test.go
similarity index 94%
rename from test/integration/distributed/inspect/etcd/suite_test.go
rename to test/integration/distributed/inspection/etcd/suite_test.go
index 1638d69eb..e0791fba9 100644
--- a/test/integration/distributed/inspect/etcd/suite_test.go
+++ b/test/integration/distributed/inspection/etcd/suite_test.go
@@ -24,11 +24,11 @@ import (
. "github.com/onsi/gomega"
"github.com/apache/skywalking-banyandb/pkg/test/setup"
-
"github.com/apache/skywalking-banyandb/test/integration/distributed/inspect"
+
"github.com/apache/skywalking-banyandb/test/integration/distributed/inspection"
)
func init() {
- inspect.SetupFunc = func() inspect.SetupResult {
+ inspection.SetupFunc = func() inspection.SetupResult {
By("Starting etcd server")
ep, _, etcdCleanup := setup.StartEmbeddedEtcd()
config := setup.EtcdClusterConfig(ep)
@@ -40,7 +40,7 @@ func init() {
By("Starting liaison node")
liaisonAddr, closerLiaisonNode := setup.LiaisonNode(config)
- return inspect.SetupResult{
+ return inspection.SetupResult{
LiaisonAddr: liaisonAddr,
EtcdEndpoint: ep,
StopFunc: func() {
diff --git a/test/integration/distributed/inspect/property/suite_test.go
b/test/integration/distributed/inspection/property/suite_test.go
similarity index 94%
rename from test/integration/distributed/inspect/property/suite_test.go
rename to test/integration/distributed/inspection/property/suite_test.go
index 161d60a76..a0b9ea1b1 100644
--- a/test/integration/distributed/inspect/property/suite_test.go
+++ b/test/integration/distributed/inspection/property/suite_test.go
@@ -25,11 +25,11 @@ import (
"github.com/apache/skywalking-banyandb/pkg/test"
"github.com/apache/skywalking-banyandb/pkg/test/setup"
-
"github.com/apache/skywalking-banyandb/test/integration/distributed/inspect"
+
"github.com/apache/skywalking-banyandb/test/integration/distributed/inspection"
)
func init() {
- inspect.SetupFunc = func() inspect.SetupResult {
+ inspection.SetupFunc = func() inspection.SetupResult {
tmpDir, tmpDirCleanup, tmpErr := test.NewSpace()
Expect(tmpErr).NotTo(HaveOccurred())
dfWriter := setup.NewDiscoveryFileWriter(tmpDir)
@@ -41,7 +41,7 @@ func init() {
By("Starting liaison node")
liaisonAddr, closerLiaisonNode := setup.LiaisonNode(config)
- return inspect.SetupResult{
+ return inspection.SetupResult{
LiaisonAddr: liaisonAddr,
StopFunc: func() {
closerLiaisonNode()
diff --git a/test/integration/standalone/deletion/deletion_suite_test.go
b/test/integration/standalone/deletion/deletion_suite_test.go
new file mode 100644
index 000000000..4003682a1
--- /dev/null
+++ b/test/integration/standalone/deletion/deletion_suite_test.go
@@ -0,0 +1,225 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package integration_deletion_test provides integration tests for group
deletion in standalone mode.
+package integration_deletion_test
+
+import (
+ "context"
+ "os"
+ "path/filepath"
+ "testing"
+ "time"
+
+ g "github.com/onsi/ginkgo/v2"
+ gm "github.com/onsi/gomega"
+ "github.com/onsi/gomega/gleak"
+ grpclib "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/credentials/insecure"
+ "google.golang.org/grpc/status"
+
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/test"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
+ "github.com/apache/skywalking-banyandb/pkg/test/setup"
+ integration_standalone
"github.com/apache/skywalking-banyandb/test/integration/standalone"
+)
+
+func TestDeletion(t *testing.T) {
+ gm.RegisterFailHandler(g.Fail)
+ g.RunSpecs(t, "Integration Deletion Suite",
g.Label(integration_standalone.Labels...))
+}
+
+var _ = g.BeforeSuite(func() {
+ gm.Expect(logger.Init(logger.Logging{
+ Env: "dev",
+ Level: flags.LogLevel,
+ })).To(gm.Succeed())
+})
+
+var _ = g.Describe("GroupDeletion", func() {
+ var goods []gleak.Goroutine
+ var conn *grpclib.ClientConn
+ var deferFn func()
+ var dataPath string
+ var groupClient databasev1.GroupRegistryServiceClient
+
+ g.BeforeEach(func() {
+ goods = gleak.Goroutines()
+ var pathDeferFn func()
+ var spaceErr error
+ dataPath, pathDeferFn, spaceErr = test.NewSpace()
+ gm.Expect(spaceErr).NotTo(gm.HaveOccurred())
+ ports, portsErr := test.AllocateFreePorts(4)
+ gm.Expect(portsErr).NotTo(gm.HaveOccurred())
+ var addr string
+ var serverCloseFn func()
+ addr, _, serverCloseFn = setup.ClosableStandalone(nil,
dataPath, ports)
+ var connErr error
+ conn, connErr = grpchelper.Conn(addr, 10*time.Second,
grpclib.WithTransportCredentials(insecure.NewCredentials()))
+ gm.Expect(connErr).NotTo(gm.HaveOccurred())
+ groupClient = databasev1.NewGroupRegistryServiceClient(conn)
+ deferFn = func() {
+ serverCloseFn()
+ pathDeferFn()
+ }
+ })
+
+ g.AfterEach(func() {
+ _ = conn.Close()
+ deferFn()
+ gm.Eventually(gleak.Goroutines,
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+ })
+
+ g.It("returns NotFound when deleting a nonexistent group", func() {
+ _, err := groupClient.Delete(context.TODO(),
&databasev1.GroupRegistryServiceDeleteRequest{
+ Group: "nonexistent-group-xyz",
+ })
+ gm.Expect(err).Should(gm.HaveOccurred())
+ errStatus, ok := status.FromError(err)
+ gm.Expect(ok).To(gm.BeTrue())
+ gm.Expect(errStatus.Code()).To(gm.Equal(codes.NotFound))
+ })
+
+ g.It("can delete an empty group without force flag", func() {
+ const newGroup = "empty-test-group"
+ streamDir := filepath.Join(dataPath, "stream", "data", newGroup)
+ g.By("Creating a new empty group")
+ _, err := groupClient.Create(context.TODO(),
&databasev1.GroupRegistryServiceCreateRequest{
+ Group: &commonv1.Group{
+ Metadata: &commonv1.Metadata{Name: newGroup},
+ Catalog: commonv1.Catalog_CATALOG_STREAM,
+ ResourceOpts: &commonv1.ResourceOpts{
+ ShardNum: 1,
+ SegmentInterval: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 1,
+ },
+ Ttl: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 7,
+ },
+ },
+ },
+ })
+ gm.Expect(err).ShouldNot(gm.HaveOccurred())
+
+ g.By("Verifying the group exists")
+ getResp, err := groupClient.Get(context.TODO(),
&databasev1.GroupRegistryServiceGetRequest{Group: newGroup})
+ gm.Expect(err).ShouldNot(gm.HaveOccurred())
+
gm.Expect(getResp.GetGroup().GetMetadata().GetName()).To(gm.Equal(newGroup))
+
+ g.By("Verifying group data directory exists on disk")
+ gm.Eventually(func() bool {
+ _, statErr := os.Stat(streamDir)
+ return statErr == nil
+ }, flags.EventuallyTimeout).Should(gm.BeTrue())
+
+ g.By("Deleting the empty group without force")
+ gm.Eventually(func() error {
+ _, deleteErr := groupClient.Delete(context.TODO(),
&databasev1.GroupRegistryServiceDeleteRequest{
+ Group: newGroup,
+ })
+ return deleteErr
+ }, flags.EventuallyTimeout).Should(gm.Succeed())
+
+ g.By("Verifying the group is eventually removed")
+ gm.Eventually(func() codes.Code {
+ _, getErr := groupClient.Get(context.TODO(),
&databasev1.GroupRegistryServiceGetRequest{Group: newGroup})
+ if getErr == nil {
+ return codes.OK
+ }
+ st, _ := status.FromError(getErr)
+ return st.Code()
+ }, flags.EventuallyTimeout).Should(gm.Equal(codes.NotFound))
+
+ g.By("Verifying group data directory is removed from disk")
+ gm.Eventually(func() bool {
+ _, statErr := os.Stat(streamDir)
+ return os.IsNotExist(statErr)
+ }, flags.EventuallyTimeout).Should(gm.BeTrue())
+ })
+
+ g.It("can delete an existing group with force=true", func() {
+ const groupName = "sw_metric"
+ measureDir := filepath.Join(dataPath, "measure", "data",
groupName)
+ g.By("Verifying the group exists with resources")
+ _, err := groupClient.Get(context.TODO(),
&databasev1.GroupRegistryServiceGetRequest{Group: groupName})
+ gm.Expect(err).ShouldNot(gm.HaveOccurred())
+
+ g.By("Verifying group data directory exists on disk")
+ gm.Eventually(func() bool {
+ _, statErr := os.Stat(measureDir)
+ return statErr == nil
+ }, flags.EventuallyTimeout).Should(gm.BeTrue())
+
+ g.By("Deleting group with force=true")
+ _, err = groupClient.Delete(context.TODO(),
&databasev1.GroupRegistryServiceDeleteRequest{
+ Group: groupName,
+ Force: true,
+ })
+ gm.Expect(err).ShouldNot(gm.HaveOccurred())
+
+ g.By("Verifying the group is eventually removed")
+ gm.Eventually(func() codes.Code {
+ _, getErr := groupClient.Get(context.TODO(),
&databasev1.GroupRegistryServiceGetRequest{Group: groupName})
+ if getErr == nil {
+ return codes.OK
+ }
+ st, _ := status.FromError(getErr)
+ return st.Code()
+ }, flags.EventuallyTimeout).Should(gm.Equal(codes.NotFound))
+
+ g.By("Verifying group data directory is removed from disk")
+ gm.Eventually(func() bool {
+ _, statErr := os.Stat(measureDir)
+ return os.IsNotExist(statErr)
+ }, flags.EventuallyTimeout).Should(gm.BeTrue())
+ })
+
+ g.It("can query deletion task status until completed", func() {
+ const groupName = "sw_metric"
+ measureDir := filepath.Join(dataPath, "measure", "data",
groupName)
+ g.By("Verifying group data directory exists on disk before
deletion")
+ gm.Eventually(func() bool {
+ _, statErr := os.Stat(measureDir)
+ return statErr == nil
+ }, flags.EventuallyTimeout).Should(gm.BeTrue())
+
+ g.By("Initiating group deletion with force=true")
+ _, err := groupClient.Delete(context.TODO(),
&databasev1.GroupRegistryServiceDeleteRequest{
+ Group: groupName,
+ Force: true,
+ })
+ gm.Expect(err).ShouldNot(gm.HaveOccurred())
+
+ g.By("Waiting for deletion task to reach COMPLETED")
+ gm.Eventually(func() databasev1.GroupDeletionTask_Phase {
+ queryResp, queryErr :=
groupClient.Query(context.TODO(), &databasev1.GroupRegistryServiceQueryRequest{
+ Group: groupName,
+ })
+ if queryErr != nil {
+ return
databasev1.GroupDeletionTask_PHASE_UNSPECIFIED
+ }
+ return queryResp.GetTask().GetCurrentPhase()
+ }, flags.EventuallyTimeout,
100*time.Millisecond).Should(gm.Equal(databasev1.GroupDeletionTask_PHASE_COMPLETED))
+ })
+})
diff --git a/test/integration/standalone/inspect/common.go
b/test/integration/standalone/inspection/common.go
similarity index 99%
rename from test/integration/standalone/inspect/common.go
rename to test/integration/standalone/inspection/common.go
index f4e138e36..6d40c68f1 100644
--- a/test/integration/standalone/inspect/common.go
+++ b/test/integration/standalone/inspection/common.go
@@ -15,8 +15,8 @@
// specific language governing permissions and limitations
// under the License.
-// Package inspect provides integration tests for the inspect functionality in
standalone mode.
-package inspect
+// Package inspection provides integration tests for the inspect functionality
in standalone mode.
+package inspection
import (
"context"
diff --git a/test/integration/standalone/inspect/etcd/suite_test.go
b/test/integration/standalone/inspection/etcd/suite_test.go
similarity index 93%
rename from test/integration/standalone/inspect/etcd/suite_test.go
rename to test/integration/standalone/inspection/etcd/suite_test.go
index f34bd4fa6..77d37b7c8 100644
--- a/test/integration/standalone/inspect/etcd/suite_test.go
+++ b/test/integration/standalone/inspection/etcd/suite_test.go
@@ -25,14 +25,14 @@ import (
"github.com/apache/skywalking-banyandb/pkg/test/setup"
integration_standalone
"github.com/apache/skywalking-banyandb/test/integration/standalone"
-
"github.com/apache/skywalking-banyandb/test/integration/standalone/inspect"
+
"github.com/apache/skywalking-banyandb/test/integration/standalone/inspection"
)
func init() {
- inspect.SetupFunc = func() inspect.SetupResult {
+ inspection.SetupFunc = func() inspection.SetupResult {
By("Starting standalone server")
addr, _, closeFn := setup.EmptyStandalone(nil)
- return inspect.SetupResult{
+ return inspection.SetupResult{
Addr: addr,
StopFunc: closeFn,
}
diff --git a/test/integration/standalone/inspect/property/suite_test.go
b/test/integration/standalone/inspection/property/suite_test.go
similarity index 94%
rename from test/integration/standalone/inspect/property/suite_test.go
rename to test/integration/standalone/inspection/property/suite_test.go
index 6edcacc54..f214bcbad 100644
--- a/test/integration/standalone/inspect/property/suite_test.go
+++ b/test/integration/standalone/inspection/property/suite_test.go
@@ -26,18 +26,18 @@ import (
"github.com/apache/skywalking-banyandb/pkg/test"
"github.com/apache/skywalking-banyandb/pkg/test/setup"
integration_standalone
"github.com/apache/skywalking-banyandb/test/integration/standalone"
-
"github.com/apache/skywalking-banyandb/test/integration/standalone/inspect"
+
"github.com/apache/skywalking-banyandb/test/integration/standalone/inspection"
)
func init() {
- inspect.SetupFunc = func() inspect.SetupResult {
+ inspection.SetupFunc = func() inspection.SetupResult {
By("Starting standalone server with property mode")
tmpDir, tmpDirCleanup, tmpErr := test.NewSpace()
Expect(tmpErr).NotTo(HaveOccurred())
dfWriter := setup.NewDiscoveryFileWriter(tmpDir)
config := setup.PropertyClusterConfig(dfWriter)
addr, _, closeFn := setup.EmptyStandalone(config)
- return inspect.SetupResult{
+ return inspection.SetupResult{
Addr: addr,
StopFunc: func() {
closeFn()