Copilot commented on code in PR #978:
URL: 
https://github.com/apache/skywalking-banyandb/pull/978#discussion_r2853582646


##########
banyand/property/service.go:
##########
@@ -204,6 +215,33 @@ func (s *service) PreRun(ctx context.Context) error {
        )
 }
 
+type propertyGroupEventHandler struct {
+       svc *service
+}
+
+func (h *propertyGroupEventHandler) OnInit([]schema.Kind) (bool, []int64) {
+       return false, nil
+}
+
+func (h *propertyGroupEventHandler) OnAddOrUpdate(_ schema.Metadata) {}
+
+func (h *propertyGroupEventHandler) OnDelete(md schema.Metadata) {
+       if md.Kind != schema.KindGroup {
+               return
+       }
+       group := md.Spec.(*commonv1.Group)
+       if group.Catalog != commonv1.Catalog_CATALOG_PROPERTY {
+               return
+       }
+       groupName := group.Metadata.GetName()
+       if dropErr := h.svc.db.Drop(groupName); dropErr != nil {
+               h.svc.l.Error().Err(dropErr).Str("group", 
groupName).Msg("failed to drop group")
+       }
+       if ch, loaded := h.svc.pendingGroupDrops.LoadAndDelete(groupName); 
loaded {
+               close(ch.(chan struct{}))
+       }

Review Comment:
   The drop-notification channel is closed even when `db.Drop(groupName)` 
fails, which can signal "drop completed" to waiters while data is still 
present. Consider only closing the channel on successful drop, and/or recording 
the drop error so the deletion task can fail instead of completing.



##########
banyand/liaison/grpc/measure.go:
##########
@@ -358,6 +366,16 @@ func (ms *measureService) handleWriteCleanup(publisher 
queue.BatchPublisher, suc
 var emptyMeasureQueryResponse = &measurev1.QueryResponse{DataPoints: 
make([]*measurev1.DataPoint, 0)}
 
 func (ms *measureService) Query(ctx context.Context, req 
*measurev1.QueryRequest) (resp *measurev1.QueryResponse, err error) {
+       for _, g := range req.Groups {
+               if acquireErr := ms.groupRepo.acquireRequest(g); acquireErr != 
nil {
+                       return nil, status.Errorf(codes.FailedPrecondition, 
"group %s is pending deletion", g)
+               }
+       }
+       defer func() {
+               for _, g := range req.Groups {

Review Comment:
   If `acquireRequest` fails partway through iterating `req.Groups`, earlier 
successful acquisitions are leaked because the function returns immediately. 
This can block group deletion indefinitely. Release already-acquired groups 
before returning an error.
   ```suggestion
        acquiredGroups := make([]string, 0, len(req.Groups))
        for _, g := range req.Groups {
                if acquireErr := ms.groupRepo.acquireRequest(g); acquireErr != 
nil {
                        for _, ag := range acquiredGroups {
                                ms.groupRepo.releaseRequest(ag)
                        }
                        return nil, status.Errorf(codes.FailedPrecondition, 
"group %s is pending deletion", g)
                }
                acquiredGroups = append(acquiredGroups, g)
        }
        defer func() {
                for _, g := range acquiredGroups {
   ```



##########
banyand/property/service.go:
##########
@@ -122,6 +126,12 @@ func (s *service) Validate() error {
        return nil
 }
 
+func (s *service) SubscribeGroupDrop(groupName string) <-chan struct{} {

Review Comment:
   `SubscribeGroupDrop` stores a new channel unconditionally. If called 
multiple times for the same group, earlier subscribers will block forever 
because their channel is replaced and never closed. Consider returning an 
existing channel if present or closing the previous one before replacing it.
   ```suggestion
   func (s *service) SubscribeGroupDrop(groupName string) <-chan struct{} {
        if existing, ok := s.pendingGroupDrops.Load(groupName); ok {
                if ch, ok := existing.(chan struct{}); ok {
                        return ch
                }
        }
   ```



##########
pkg/schema/cache.go:
##########
@@ -108,6 +109,13 @@ func (sr *schemaRepo) StopCh() <-chan struct{} {
        return sr.closer.CloseNotify()
 }
 
+// SubscribeGroupDrop returns a channel that is closed after the group's 
physical storage is dropped.
+func (sr *schemaRepo) SubscribeGroupDrop(groupName string) <-chan struct{} {
+       ch := make(chan struct{})
+       sr.pendingGroupDrops.Store(groupName, ch)
+       return ch

Review Comment:
   `SubscribeGroupDrop` overwrites any existing channel for the same group name 
without closing/returning the prior one. A previous waiter would then block 
forever and the map entry can leak if the group is never deleted. Consider 
returning an existing channel if present, or closing/replacing the previous 
channel explicitly (and/or adding an unsubscribe mechanism).



##########
banyand/liaison/grpc/deletion.go:
##########
@@ -0,0 +1,408 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+       "context"
+       "fmt"
+       "sync"
+
+       "google.golang.org/protobuf/proto"
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+const (
+       internalDeletionTaskGroup        = "_deletion_task"
+       internalDeletionTaskPropertyName = "deletion_task"
+       taskDataTagName                  = "task_data"
+)
+
+type propertyApplier interface {
+       Apply(ctx context.Context, req *propertyv1.ApplyRequest) 
(*propertyv1.ApplyResponse, error)
+       Query(ctx context.Context, req *propertyv1.QueryRequest) 
(*propertyv1.QueryResponse, error)
+}
+
+// GroupDropSubscriber defines an interface for subscribing to group drop 
events.
+type GroupDropSubscriber interface {
+       SubscribeGroupDrop(catalog commonv1.Catalog, groupName string) <-chan 
struct{}
+}
+
+type groupDeletionTaskManager struct {
+       schemaRegistry metadata.Repo
+       propServer     propertyApplier
+       log            *logger.Logger
+       groupRepo      *groupRepo
+       dropSubscriber GroupDropSubscriber
+       tasks          sync.Map
+}
+
+func newGroupDeletionTaskManager(
+       schemaRegistry metadata.Repo, propServer *propertyServer, gr 
*groupRepo, dropSubscriber GroupDropSubscriber, l *logger.Logger,
+) *groupDeletionTaskManager {
+       return &groupDeletionTaskManager{
+               schemaRegistry: schemaRegistry,
+               propServer:     propServer,
+               groupRepo:      gr,
+               dropSubscriber: dropSubscriber,
+               log:            l,
+       }
+}
+
+func (m *groupDeletionTaskManager) initPropertyStorage(ctx context.Context) 
error {
+       group := &commonv1.Group{
+               Metadata: &commonv1.Metadata{
+                       Name: internalDeletionTaskGroup,
+               },
+               Catalog: commonv1.Catalog_CATALOG_PROPERTY,
+               ResourceOpts: &commonv1.ResourceOpts{
+                       ShardNum: 1,
+               },
+       }
+       _, getGroupErr := m.schemaRegistry.GroupRegistry().GetGroup(ctx, 
internalDeletionTaskGroup)
+       if getGroupErr != nil {
+               if createErr := 
m.schemaRegistry.GroupRegistry().CreateGroup(ctx, group); createErr != nil {
+                       return fmt.Errorf("failed to create internal deletion 
task group: %w", createErr)
+               }
+       }
+       propSchema := &databasev1.Property{
+               Metadata: &commonv1.Metadata{
+                       Group: internalDeletionTaskGroup,
+                       Name:  internalDeletionTaskPropertyName,
+               },
+               Tags: []*databasev1.TagSpec{
+                       {
+                               Name: taskDataTagName,
+                               Type: databasev1.TagType_TAG_TYPE_DATA_BINARY,
+                       },
+               },
+       }
+       _, getPropErr := m.schemaRegistry.PropertyRegistry().GetProperty(ctx, 
propSchema.Metadata)
+       if getPropErr != nil {
+               if createErr := 
m.schemaRegistry.PropertyRegistry().CreateProperty(ctx, propSchema); createErr 
!= nil {
+                       return fmt.Errorf("failed to create internal deletion 
task property schema: %w", createErr)
+               }
+       }

Review Comment:
   `initPropertyStorage` treats any `GetGroup`/`GetProperty` error as "not 
found" and proceeds to create the internal resources. If the error is transient 
(e.g., permission/network), this can mask the real issue and attempt an invalid 
create. Only create on a confirmed NotFound (e.g., `errors.As(err, 
schema.ErrGRPCResourceNotFound)` / gRPC `codes.NotFound`), and return the 
original error otherwise.



##########
banyand/liaison/grpc/measure.go:
##########
@@ -424,6 +442,16 @@ func (ms *measureService) Query(ctx context.Context, req 
*measurev1.QueryRequest
 }
 
 func (ms *measureService) TopN(ctx context.Context, topNRequest 
*measurev1.TopNRequest) (resp *measurev1.TopNResponse, err error) {
+       for _, g := range topNRequest.GetGroups() {
+               if acquireErr := ms.groupRepo.acquireRequest(g); acquireErr != 
nil {
+                       return nil, status.Errorf(codes.FailedPrecondition, 
"group %s is pending deletion", g)
+               }
+       }
+       defer func() {
+               for _, g := range topNRequest.GetGroups() {

Review Comment:
   In `TopN`, an `acquireRequest` failure returns immediately without releasing 
groups already acquired earlier in the loop. This leaks the in-flight reference 
and can block deletion. Acquire with rollback/release-on-error.
   ```suggestion
        acquiredGroups := make([]string, 0, len(topNRequest.GetGroups()))
        for _, g := range topNRequest.GetGroups() {
                if acquireErr := ms.groupRepo.acquireRequest(g); acquireErr != 
nil {
                        for _, ag := range acquiredGroups {
                                ms.groupRepo.releaseRequest(ag)
                        }
                        return nil, status.Errorf(codes.FailedPrecondition, 
"group %s is pending deletion", g)
                }
                acquiredGroups = append(acquiredGroups, g)
        }
        defer func() {
                for _, g := range acquiredGroups {
   ```



##########
banyand/liaison/grpc/stream.go:
##########
@@ -320,6 +329,16 @@ func (s *streamService) Write(stream 
streamv1.StreamService_WriteServer) error {
 var emptyStreamQueryResponse = &streamv1.QueryResponse{Elements: 
make([]*streamv1.Element, 0)}
 
 func (s *streamService) Query(ctx context.Context, req *streamv1.QueryRequest) 
(resp *streamv1.QueryResponse, err error) {
+       for _, g := range req.Groups {
+               if acquireErr := s.groupRepo.acquireRequest(g); acquireErr != 
nil {
+                       return nil, status.Errorf(codes.FailedPrecondition, 
"group %s is pending deletion", g)
+               }

Review Comment:
   If `acquireRequest` fails for a later group in `req.Groups`, any groups 
acquired earlier in this loop are never released because the function returns 
immediately. This can leak in-flight counters and block pending deletions. 
Track which groups were successfully acquired and release them before returning 
on error.
   ```suggestion
   func (s *streamService) Query(ctx context.Context, req 
*streamv1.QueryRequest) (resp *streamv1.QueryResponse, err error) {
        acquiredGroups := make([]string, 0, len(req.Groups))
        for _, g := range req.Groups {
                if acquireErr := s.groupRepo.acquireRequest(g); acquireErr != 
nil {
                        for _, ag := range acquiredGroups {
                                s.groupRepo.releaseRequest(ag)
                        }
                        return nil, status.Errorf(codes.FailedPrecondition, 
"group %s is pending deletion", g)
                }
                acquiredGroups = append(acquiredGroups, g)
   ```



##########
banyand/liaison/grpc/property.go:
##########
@@ -415,6 +424,16 @@ func (ps *propertyServer) Delete(ctx context.Context, req 
*propertyv1.DeleteRequ
 }
 
 func (ps *propertyServer) Query(ctx context.Context, req 
*propertyv1.QueryRequest) (resp *propertyv1.QueryResponse, err error) {
+       for _, g := range req.Groups {
+               if acquireErr := ps.groupRepo.acquireRequest(g); acquireErr != 
nil {

Review Comment:
   If `acquireRequest` fails for one of the requested groups, earlier acquired 
groups are not released due to the immediate return. This leaks the in-flight 
count and can cause group deletion to hang. Track acquired groups and release 
them before returning on error.
   ```suggestion
        for i, g := range req.Groups {
                if acquireErr := ps.groupRepo.acquireRequest(g); acquireErr != 
nil {
                        // Release all groups that were successfully acquired 
before this failure.
                        for j := 0; j < i; j++ {
                                ps.groupRepo.releaseRequest(req.Groups[j])
                        }
   ```



##########
api/proto/banyandb/database/v1/rpc.proto:
##########
@@ -358,21 +358,13 @@ message GroupRegistryServiceUpdateResponse {}
 message GroupRegistryServiceDeleteRequest {
   // group is the name of the group to delete.
   string group = 1;
-  // dry_run indicates whether to perform a dry run without actually deleting 
data.
-  // When true, returns what would be deleted without making changes.
-  bool dry_run = 2;
   // force indicates whether to force delete the group even if it contains 
data.
   // When false, deletion will fail if the group is not empty.
-  bool force = 3;
+  bool force = 2;
 }

Review Comment:
   In protobuf, field numbers must never be reused. Reassigning `force` from 
tag 3 to tag 2 (previously `dry_run`) will cause old clients to have 
`dry_run=true` interpreted as `force=true` and breaks wire compatibility. Keep 
`force = 3` and mark tag 2 as `reserved` (and keep `dry_run` removed) instead.



##########
banyand/liaison/grpc/deletion.go:
##########
@@ -0,0 +1,408 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+       "context"
+       "fmt"
+       "sync"
+
+       "google.golang.org/protobuf/proto"
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+const (
+       internalDeletionTaskGroup        = "_deletion_task"
+       internalDeletionTaskPropertyName = "deletion_task"
+       taskDataTagName                  = "task_data"
+)
+
+type propertyApplier interface {
+       Apply(ctx context.Context, req *propertyv1.ApplyRequest) 
(*propertyv1.ApplyResponse, error)
+       Query(ctx context.Context, req *propertyv1.QueryRequest) 
(*propertyv1.QueryResponse, error)
+}
+
+// GroupDropSubscriber defines an interface for subscribing to group drop 
events.
+type GroupDropSubscriber interface {
+       SubscribeGroupDrop(catalog commonv1.Catalog, groupName string) <-chan 
struct{}
+}
+
+type groupDeletionTaskManager struct {
+       schemaRegistry metadata.Repo
+       propServer     propertyApplier
+       log            *logger.Logger
+       groupRepo      *groupRepo
+       dropSubscriber GroupDropSubscriber
+       tasks          sync.Map
+}
+
+func newGroupDeletionTaskManager(
+       schemaRegistry metadata.Repo, propServer *propertyServer, gr 
*groupRepo, dropSubscriber GroupDropSubscriber, l *logger.Logger,
+) *groupDeletionTaskManager {
+       return &groupDeletionTaskManager{
+               schemaRegistry: schemaRegistry,
+               propServer:     propServer,
+               groupRepo:      gr,
+               dropSubscriber: dropSubscriber,
+               log:            l,
+       }
+}
+
+func (m *groupDeletionTaskManager) initPropertyStorage(ctx context.Context) 
error {
+       group := &commonv1.Group{
+               Metadata: &commonv1.Metadata{
+                       Name: internalDeletionTaskGroup,
+               },
+               Catalog: commonv1.Catalog_CATALOG_PROPERTY,
+               ResourceOpts: &commonv1.ResourceOpts{
+                       ShardNum: 1,
+               },
+       }
+       _, getGroupErr := m.schemaRegistry.GroupRegistry().GetGroup(ctx, 
internalDeletionTaskGroup)
+       if getGroupErr != nil {
+               if createErr := 
m.schemaRegistry.GroupRegistry().CreateGroup(ctx, group); createErr != nil {
+                       return fmt.Errorf("failed to create internal deletion 
task group: %w", createErr)
+               }
+       }
+       propSchema := &databasev1.Property{
+               Metadata: &commonv1.Metadata{
+                       Group: internalDeletionTaskGroup,
+                       Name:  internalDeletionTaskPropertyName,
+               },
+               Tags: []*databasev1.TagSpec{
+                       {
+                               Name: taskDataTagName,
+                               Type: databasev1.TagType_TAG_TYPE_DATA_BINARY,
+                       },
+               },
+       }
+       _, getPropErr := m.schemaRegistry.PropertyRegistry().GetProperty(ctx, 
propSchema.Metadata)
+       if getPropErr != nil {
+               if createErr := 
m.schemaRegistry.PropertyRegistry().CreateProperty(ctx, propSchema); createErr 
!= nil {
+                       return fmt.Errorf("failed to create internal deletion 
task property schema: %w", createErr)
+               }
+       }
+       return nil
+}
+
+func (m *groupDeletionTaskManager) startDeletion(ctx context.Context, group 
string) error {
+       if _, loaded := m.tasks.LoadOrStore(group, true); loaded {
+               return fmt.Errorf("deletion task for group %s is already in 
progress", group)
+       }
+       task := &databasev1.GroupDeletionTask{
+               CurrentPhase:  databasev1.GroupDeletionTask_PHASE_PENDING,
+               TotalCounts:   make(map[string]int32),
+               DeletedCounts: make(map[string]int32),
+               CreatedAt:     timestamppb.Now(),
+       }
+       dataInfo, dataErr := m.schemaRegistry.CollectDataInfo(ctx, group)
+       if dataErr != nil {
+               m.tasks.Delete(group)
+               return fmt.Errorf("failed to collect data info for group %s: 
%w", group, dataErr)
+       }
+       var totalDataSize int64
+       for _, di := range dataInfo {
+               totalDataSize += di.GetDataSizeBytes()
+       }
+       task.TotalDataSizeBytes = totalDataSize
+       if saveErr := m.saveDeletionTask(ctx, group, task); saveErr != nil {
+               m.tasks.Delete(group)
+               return fmt.Errorf("failed to save initial deletion task for 
group %s: %w", group, saveErr)
+       }
+       go m.executeDeletion(context.WithoutCancel(ctx), group, task)
+       return nil
+}
+
+func (m *groupDeletionTaskManager) executeDeletion(ctx context.Context, group 
string, task *databasev1.GroupDeletionTask) {
+       defer m.tasks.Delete(group)
+       opt := schema.ListOpt{Group: group}
+
+       task.Message = "waiting for in-flight requests to complete"
+       m.saveProgress(ctx, group, task)
+       done := m.groupRepo.startPendingDeletion(group)
+       defer m.groupRepo.clearPendingDeletion(group)
+       <-done
+
+       task.CurrentPhase = databasev1.GroupDeletionTask_PHASE_IN_PROGRESS
+
+       type deletionStep struct {
+               fn      func() error
+               message string
+       }
+       steps := []deletionStep{
+               {func() error { return m.deleteIndexRuleBindings(ctx, opt, 
task) }, "deleting index rule bindings"},
+               {func() error { return m.deleteIndexRules(ctx, opt, task) }, 
"deleting index rules"},
+               {func() error { return m.deleteProperties(ctx, opt, task) }, 
"deleting properties"},
+               {func() error { return m.deleteStreams(ctx, opt, task) }, 
"deleting streams"},
+               {func() error { return m.deleteMeasures(ctx, opt, task) }, 
"deleting measures"},
+               {func() error { return m.deleteTraces(ctx, opt, task) }, 
"deleting traces"},
+               {func() error { return m.deleteTopNAggregations(ctx, opt, task) 
}, "deleting topN aggregations"},
+       }
+       for _, step := range steps {
+               if stepErr := m.runStep(ctx, group, task, step.message, 
step.fn); stepErr != nil {
+                       return
+               }
+       }
+
+       task.Message = "deleting group and data files"
+       var dropCh <-chan struct{}
+       if m.dropSubscriber != nil {
+               if groupMeta, getErr := 
m.schemaRegistry.GroupRegistry().GetGroup(ctx, group); getErr == nil {
+                       dropCh = 
m.dropSubscriber.SubscribeGroupDrop(groupMeta.Catalog, group)
+               }
+       }
+       _, deleteGroupErr := m.schemaRegistry.GroupRegistry().DeleteGroup(ctx, 
group)
+       if deleteGroupErr != nil {
+               m.failTask(ctx, group, task, fmt.Sprintf("failed to delete 
group: %v", deleteGroupErr))
+               if dropCh != nil {
+                       go func() { <-dropCh }()

Review Comment:
   On `DeleteGroup` error, spawning a goroutine to wait on `dropCh` can leak 
forever because the drop channel is only closed on a successful group-delete 
event. It also leaves the subscriber's pending-drop entry around. Prefer 
unsubscribing/closing the drop subscription on error (or only subscribing after 
`DeleteGroup` succeeds), or wait with a bounded timeout/context and clean up 
the subscription.
   ```suggestion
                        select {
                        case <-dropCh:
                        default:
                        }
   ```



##########
banyand/property/db/db.go:
##########
@@ -374,6 +376,27 @@ func (db *database) getShard(group string, id 
common.ShardID) (*shard, bool) {
        return nil, false
 }
 
+// Drop closes and removes all shards for the given group and deletes the 
group directory.
+func (db *database) Drop(groupName string) error {
+       value, ok := db.groups.LoadAndDelete(groupName)
+       if !ok {
+               return nil
+       }
+       gs := value.(*groupShards)
+       sLst := gs.shards.Load()
+       if sLst != nil {
+               var err error
+               for _, s := range *sLst {
+                       multierr.AppendInto(&err, s.close())
+               }
+               if err != nil {
+                       return err
+               }
+       }
+       db.lfs.MustRMAll(gs.location)
+       return nil

Review Comment:
   `Drop` uses `MustRMAll`, which can panic and crash the process if the 
filesystem removal fails. Other new Drop implementations recover and return an 
error instead. Prefer a non-panicking removal (or wrap `MustRMAll` with 
`defer`/`recover`) so `Drop` reliably returns an error rather than taking down 
the node.



##########
banyand/liaison/grpc/trace.go:
##########
@@ -409,6 +418,16 @@ func (s *traceService) Write(stream 
tracev1.TraceService_WriteServer) error {
 var emptyTraceQueryResponse = &tracev1.QueryResponse{Traces: 
make([]*tracev1.Trace, 0)}
 
 func (s *traceService) Query(ctx context.Context, req *tracev1.QueryRequest) 
(resp *tracev1.QueryResponse, err error) {
+       for _, g := range req.Groups {
+               if acquireErr := s.groupRepo.acquireRequest(g); acquireErr != 
nil {

Review Comment:
   Same issue as in stream Query: if `acquireRequest` fails for a later group, 
previously acquired groups are not released due to the early return. This can 
prevent group deletion from ever completing. Consider acquiring with rollback 
(release already-acquired groups) on failure.
   ```suggestion
        for i, g := range req.Groups {
                if acquireErr := s.groupRepo.acquireRequest(g); acquireErr != 
nil {
                        // Roll back previously acquired groups before 
returning the error.
                        for j := 0; j < i; j++ {
                                s.groupRepo.releaseRequest(req.Groups[j])
                        }
   ```



##########
pkg/schema/cache.go:
##########
@@ -197,6 +205,9 @@ func (sr *schemaRepo) Watcher() {
                                                switch evt.Kind {
                                                case EventKindGroup:
                                                        err = 
sr.deleteGroup(evt.Metadata.GetMetadata())
+                                                       if dropCh, loaded := 
sr.pendingGroupDrops.LoadAndDelete(evt.Metadata.GetMetadata().GetName()); 
loaded {
+                                                               
close(dropCh.(chan struct{}))
+                                                       }

Review Comment:
   The drop notification channel is closed regardless of whether `deleteGroup` 
succeeded. If `g.drop()` fails, listeners will be notified even though storage 
may still exist, and the deletion task can incorrectly mark completion. Only 
close the drop channel after `deleteGroup` returns nil; if it fails, keep the 
subscription and let callers observe/handle the failure (or propagate an error).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to