This is an automated email from the ASF dual-hosted git repository. alinsran pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git
The following commit(s) were added to refs/heads/master by this push: new 07672cce fix: deadlock occurs when updating configuration fails (#2531) 07672cce is described below commit 07672ccea3d4082fc6371ffea0691359178f6b83 Author: AlinsRan <alins...@apache.org> AuthorDate: Thu Aug 28 10:00:38 2025 +0800 fix: deadlock occurs when updating configuration fails (#2531) --- internal/adc/client/client.go | 145 +++++++++++++++++------------------------- 1 file changed, 60 insertions(+), 85 deletions(-) diff --git a/internal/adc/client/client.go b/internal/adc/client/client.go index 829faee7..3b99bd34 100644 --- a/internal/adc/client/client.go +++ b/internal/adc/client/client.go @@ -72,119 +72,94 @@ type Task struct { Resources *adctypes.Resources } -func (d *Client) Update(ctx context.Context, args Task) error { +type StoreDelta struct { + Deleted map[types.NamespacedNameKind]adctypes.Config + Applied map[types.NamespacedNameKind]adctypes.Config +} + +func (d *Client) applyStoreChanges(args Task, isDelete bool) (StoreDelta, error) { d.mu.Lock() - deleteConfigs := d.ConfigManager.Update(args.Key, args.Configs) - for _, config := range deleteConfigs { - if err := d.Store.Delete(config.Name, args.ResourceTypes, args.Labels); err != nil { - log.Errorw("failed to delete resources from store", - zap.String("name", config.Name), - zap.Error(err), - ) - return err + defer d.mu.Unlock() + + var delta StoreDelta + + if isDelete { + delta.Deleted = d.ConfigManager.Get(args.Key) + d.ConfigManager.Delete(args.Key) + } else { + deleted := d.ConfigManager.Update(args.Key, args.Configs) + delta.Deleted = deleted + delta.Applied = args.Configs + } + + for _, cfg := range delta.Deleted { + if err := d.Store.Delete(cfg.Name, args.ResourceTypes, args.Labels); err != nil { + log.Errorw("store delete failed", zap.Error(err), zap.Any("cfg", cfg), zap.Any("args", args)) + return StoreDelta{}, errors.Wrap(err, fmt.Sprintf("store delete failed for config %s", cfg.Name)) } } - for _, config := range args.Configs { - if err := d.Insert(config.Name, args.ResourceTypes, args.Resources, args.Labels); err != nil { - log.Errorw("failed to insert resources into store", - zap.String("name", config.Name), - zap.Error(err), - ) - return err + for _, cfg := range delta.Applied { + if err := d.Insert(cfg.Name, args.ResourceTypes, args.Resources, args.Labels); err != nil { + log.Errorw("store insert failed", zap.Error(err), zap.Any("cfg", cfg), zap.Any("args", args)) + return StoreDelta{}, errors.Wrap(err, fmt.Sprintf("store insert failed for config %s", cfg.Name)) } } - d.mu.Unlock() + return delta, nil +} + +func (d *Client) applySync(ctx context.Context, args Task, delta StoreDelta) error { d.syncMu.RLock() defer d.syncMu.RUnlock() - if len(deleteConfigs) > 0 { - err := d.sync(ctx, Task{ + if len(delta.Deleted) > 0 { + if err := d.sync(ctx, Task{ Name: args.Name, Labels: args.Labels, ResourceTypes: args.ResourceTypes, - Configs: deleteConfigs, - }) - if err != nil { + Configs: delta.Deleted, + }); err != nil { log.Warnw("failed to sync deleted configs", zap.Error(err)) } } - return d.sync(ctx, args) + if len(delta.Applied) > 0 { + return d.sync(ctx, Task{ + Name: args.Name, + Labels: args.Labels, + ResourceTypes: args.ResourceTypes, + Configs: delta.Applied, + Resources: args.Resources, + }) + } + return nil } -func (d *Client) UpdateConfig(ctx context.Context, args Task) error { - d.mu.Lock() - defer d.mu.Unlock() - deleteConfigs := d.ConfigManager.Update(args.Key, args.Configs) - - for _, config := range deleteConfigs { - if err := d.Store.Delete(config.Name, args.ResourceTypes, args.Labels); err != nil { - log.Errorw("failed to delete resources from store", - zap.String("name", config.Name), - zap.Error(err), - ) - return err - } +func (d *Client) Update(ctx context.Context, args Task) error { + delta, err := d.applyStoreChanges(args, false) + if err != nil { + return err } + return d.applySync(ctx, args, delta) +} - for _, config := range args.Configs { - if err := d.Insert(config.Name, args.ResourceTypes, args.Resources, args.Labels); err != nil { - log.Errorw("failed to insert resources into store", - zap.String("name", config.Name), - zap.Error(err), - ) - return err - } - } - return nil +func (d *Client) UpdateConfig(ctx context.Context, args Task) error { + _, err := d.applyStoreChanges(args, false) + return err } func (d *Client) Delete(ctx context.Context, args Task) error { - d.mu.Lock() - configs := d.ConfigManager.Get(args.Key) - d.ConfigManager.Delete(args.Key) - - for _, config := range configs { - if err := d.Store.Delete(config.Name, args.ResourceTypes, args.Labels); err != nil { - log.Errorw("failed to delete resources from store", - zap.String("name", config.Name), - zap.Error(err), - ) - return err - } + delta, err := d.applyStoreChanges(args, true) + if err != nil { + return err } - d.mu.Unlock() - - d.syncMu.RLock() - defer d.syncMu.RUnlock() - - return d.sync(ctx, Task{ - Labels: args.Labels, - ResourceTypes: args.ResourceTypes, - Configs: configs, - }) + return d.applySync(ctx, args, delta) } func (d *Client) DeleteConfig(ctx context.Context, args Task) error { - d.mu.Lock() - defer d.mu.Unlock() - - configs := d.ConfigManager.Get(args.Key) - d.ConfigManager.Delete(args.Key) - - for _, config := range configs { - if err := d.Store.Delete(config.Name, args.ResourceTypes, args.Labels); err != nil { - log.Errorw("failed to delete resources from store", - zap.String("name", config.Name), - zap.Error(err), - ) - return err - } - } - - return nil + _, err := d.applyStoreChanges(args, true) + return err } func (c *Client) Sync(ctx context.Context) (map[string]types.ADCExecutionErrors, error) {