This is an automated email from the ASF dual-hosted git repository. alinsran pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git
The following commit(s) were added to refs/heads/master by this push: new 7c926e66 feat: support retry in case of sync failure (#2534) 7c926e66 is described below commit 7c926e66b70256c5e132863ab0b3e2ec7fb4043d Author: AlinsRan <alins...@apache.org> AuthorDate: Tue Sep 2 08:59:31 2025 +0800 feat: support retry in case of sync failure (#2534) --- internal/provider/apisix/provider.go | 41 +++++++------ internal/provider/common/retrier.go | 96 +++++++++++++++++++++++++++++++ test/e2e/crds/v2/route.go | 55 ++++++++++++++++++ test/e2e/crds/v2/status.go | 42 +++++++++++--- test/e2e/framework/manifests/ingress.yaml | 2 +- test/e2e/scaffold/scaffold.go | 23 ++++++++ 6 files changed, 233 insertions(+), 26 deletions(-) diff --git a/internal/provider/apisix/provider.go b/internal/provider/apisix/provider.go index 0246b3e7..f6a8f876 100644 --- a/internal/provider/apisix/provider.go +++ b/internal/provider/apisix/provider.go @@ -37,11 +37,19 @@ import ( "github.com/apache/apisix-ingress-controller/internal/controller/status" "github.com/apache/apisix-ingress-controller/internal/manager/readiness" "github.com/apache/apisix-ingress-controller/internal/provider" + "github.com/apache/apisix-ingress-controller/internal/provider/common" "github.com/apache/apisix-ingress-controller/internal/types" "github.com/apache/apisix-ingress-controller/internal/utils" ) -const ProviderTypeAPISIX = "apisix" +const ( + ProviderTypeAPISIX = "apisix" + + RetryBaseDelay = 1 * time.Second + RetryMaxDelay = 1000 * time.Second + + MinSyncPeriod = 1 * time.Second +) type apisixProvider struct { provider.Options @@ -223,33 +231,32 @@ func (d *apisixProvider) Start(ctx context.Context) error { initalSyncDelay := d.InitSyncDelay if initalSyncDelay > 0 { - time.AfterFunc(initalSyncDelay, func() { - if err := d.sync(ctx); err != nil { - log.Error(err) - return - } - }) + time.AfterFunc(initalSyncDelay, d.syncNotify) } - if d.SyncPeriod < 1 { - return nil + syncPeriod := d.SyncPeriod + if syncPeriod < MinSyncPeriod { + syncPeriod = MinSyncPeriod } - ticker := time.NewTicker(d.SyncPeriod) + ticker := time.NewTicker(syncPeriod) defer ticker.Stop() + + retrier := common.NewRetrier(common.NewExponentialBackoff(RetryBaseDelay, RetryMaxDelay)) + for { - synced := false select { case <-d.syncCh: - synced = true case <-ticker.C: - synced = true + case <-retrier.C(): case <-ctx.Done(): + retrier.Reset() return nil } - if synced { - if err := d.sync(ctx); err != nil { - log.Error(err) - } + if err := d.sync(ctx); err != nil { + log.Error(err) + retrier.Next() + } else { + retrier.Reset() } } } diff --git a/internal/provider/common/retrier.go b/internal/provider/common/retrier.go new file mode 100644 index 00000000..1277ee93 --- /dev/null +++ b/internal/provider/common/retrier.go @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package common + +import ( + "sync" + "time" +) + +type Backoff interface { + Next() time.Duration + Reset() +} + +type ExponentialBackoff struct { + base, max, current time.Duration +} + +func NewExponentialBackoff(base, max time.Duration) *ExponentialBackoff { + return &ExponentialBackoff{base: base, max: max, current: base} +} + +func (b *ExponentialBackoff) Next() time.Duration { + delay := b.current + b.current *= 2 + if b.current > b.max { + b.current = b.max + } + return delay +} + +func (b *ExponentialBackoff) Reset() { + b.current = b.base +} + +type Retrier struct { + mu sync.Mutex + ch chan struct{} + timer *time.Timer + backoff Backoff +} + +func NewRetrier(b Backoff) *Retrier { + return &Retrier{ + ch: make(chan struct{}, 1), + backoff: b, + } +} + +func (r *Retrier) Reset() { + r.mu.Lock() + defer r.mu.Unlock() + + if r.timer != nil { + r.timer.Stop() + r.timer = nil + } + r.backoff.Reset() +} + +func (r *Retrier) Next() { + r.mu.Lock() + defer r.mu.Unlock() + + if r.timer != nil { + r.timer.Stop() + r.timer = nil + } + + delay := r.backoff.Next() + r.timer = time.AfterFunc(delay, func() { + select { + case r.ch <- struct{}{}: + default: + } + }) +} + +func (r *Retrier) C() <-chan struct{} { + return r.ch +} diff --git a/test/e2e/crds/v2/route.go b/test/e2e/crds/v2/route.go index 605a7179..ac1e47c7 100644 --- a/test/e2e/crds/v2/route.go +++ b/test/e2e/crds/v2/route.go @@ -1676,4 +1676,59 @@ spec: }) }) }) + + Context("Exception Test", func() { + const apisixRouteSpec = ` +apiVersion: apisix.apache.org/v2 +kind: ApisixRoute +metadata: + name: default +spec: + ingressClassName: %s + http: + - name: rule0 + match: + hosts: + - httpbin + paths: + - /* + backends: + - serviceName: httpbin-service-e2e-test + servicePort: 80 +` + It("try again when sync failed", func() { + s.Deployer.ScaleDataplane(0) + + err := s.CreateResourceFromString(fmt.Sprintf(apisixRouteSpec, s.Namespace())) + Expect(err).NotTo(HaveOccurred(), "creating ApisixRoute") + + By("check ApisixRoute status") + s.RetryAssertion(func() string { + output, _ := s.GetOutputFromString("ar", "default", "-o", "yaml", "-n", s.Namespace()) + return output + }).WithTimeout(30 * time.Second). + Should( + And( + ContainSubstring(`status: "False"`), + ContainSubstring(`reason: SyncFailed`), + ), + ) + + s.Deployer.ScaleDataplane(1) + + s.RetryAssertion(func() string { + output, _ := s.GetOutputFromString("ar", "default", "-o", "yaml", "-n", s.Namespace()) + return output + }).WithTimeout(60 * time.Second). + Should(ContainSubstring(`status: "True"`)) + + By("check route in APISIX") + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "httpbin", + Check: scaffold.WithExpectedStatus(200), + }) + }) + }) }) diff --git a/test/e2e/crds/v2/status.go b/test/e2e/crds/v2/status.go index 06170a6d..7b7defe6 100644 --- a/test/e2e/crds/v2/status.go +++ b/test/e2e/crds/v2/status.go @@ -24,6 +24,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/yaml" @@ -41,7 +42,7 @@ var _ = Describe("Test CRD Status", Label("apisix.apache.org", "v2", "apisixrout Context("Test ApisixRoute Sync Status", func() { BeforeEach(func() { By("create GatewayProxy") - gatewayProxy := s.GetGatewayProxyYaml() + gatewayProxy := s.GetGatewayProxyWithServiceYaml() err := s.CreateResourceFromString(gatewayProxy) Expect(err).NotTo(HaveOccurred(), "creating GatewayProxy") time.Sleep(5 * time.Second) @@ -144,7 +145,8 @@ spec: It("dataplane unavailable", func() { By("apply ApisixRoute") - applier.MustApplyAPIv2(types.NamespacedName{Namespace: s.Namespace(), Name: "default"}, &apiv2.ApisixRoute{}, fmt.Sprintf(ar, s.Namespace(), s.Namespace())) + arYaml := fmt.Sprintf(ar, s.Namespace(), s.Namespace()) + applier.MustApplyAPIv2(types.NamespacedName{Namespace: s.Namespace(), Name: "default"}, &apiv2.ApisixRoute{}, arYaml) By("check route in APISIX") s.RequestAssert(&scaffold.RequestAssert{ @@ -154,13 +156,28 @@ spec: Check: scaffold.WithExpectedStatus(200), }) - s.Deployer.ScaleDataplane(0) + By("get yaml from service") + serviceYaml, err := s.GetOutputFromString("svc", framework.ProviderType, "-o", "yaml") + Expect(err).NotTo(HaveOccurred(), "getting service yaml") + By("update service to type ExternalName with invalid host") + var k8sservice corev1.Service + err = yaml.Unmarshal([]byte(serviceYaml), &k8sservice) + Expect(err).NotTo(HaveOccurred(), "unmarshalling service") + oldSpec := k8sservice.Spec + k8sservice.Spec = corev1.ServiceSpec{ + Type: corev1.ServiceTypeExternalName, + ExternalName: "invalid.host", + } + newServiceYaml, err := yaml.Marshal(k8sservice) + Expect(err).NotTo(HaveOccurred(), "marshalling service") + err = s.CreateResourceFromString(string(newServiceYaml)) + Expect(err).NotTo(HaveOccurred(), "creating service") By("check ApisixRoute status") s.RetryAssertion(func() string { - output, _ := s.GetOutputFromString("ar", "default", "-o", "yaml", "-n", s.Namespace()) + output, _ := s.GetOutputFromString("ar", "default", "-o", "yaml") return output - }).WithTimeout(80 * time.Second). + }).WithTimeout(60 * time.Second). Should( And( ContainSubstring(`status: "False"`), @@ -168,13 +185,22 @@ spec: ), ) - s.Deployer.ScaleDataplane(1) + By("update service to original spec") + serviceYaml, err = s.GetOutputFromString("svc", framework.ProviderType, "-o", "yaml") + Expect(err).NotTo(HaveOccurred(), "getting service yaml") + err = yaml.Unmarshal([]byte(serviceYaml), &k8sservice) + Expect(err).NotTo(HaveOccurred(), "unmarshalling service") + k8sservice.Spec = oldSpec + newServiceYaml, err = yaml.Marshal(k8sservice) + Expect(err).NotTo(HaveOccurred(), "marshalling service") + err = s.CreateResourceFromString(string(newServiceYaml)) + Expect(err).NotTo(HaveOccurred(), "creating service") By("check ApisixRoute status after scaling up") s.RetryAssertion(func() string { - output, _ := s.GetOutputFromString("ar", "default", "-o", "yaml", "-n", s.Namespace()) + output, _ := s.GetOutputFromString("ar", "default", "-o", "yaml") return output - }).WithTimeout(80 * time.Second). + }).WithTimeout(60 * time.Second). Should( And( ContainSubstring(`status: "True"`), diff --git a/test/e2e/framework/manifests/ingress.yaml b/test/e2e/framework/manifests/ingress.yaml index 1ea6a88f..c411a93d 100644 --- a/test/e2e/framework/manifests/ingress.yaml +++ b/test/e2e/framework/manifests/ingress.yaml @@ -333,7 +333,7 @@ data: # The period between two consecutive syncs. # The default value is 0 seconds, which means the controller will not sync. # If you want to enable the sync, set it to a positive value. - init_sync_delay: {{ .InitSyncDelay | default "1m" }} + init_sync_delay: {{ .InitSyncDelay | default "20m" }} --- apiVersion: v1 kind: Service diff --git a/test/e2e/scaffold/scaffold.go b/test/e2e/scaffold/scaffold.go index 5d4509c2..9d8dfff1 100644 --- a/test/e2e/scaffold/scaffold.go +++ b/test/e2e/scaffold/scaffold.go @@ -460,6 +460,25 @@ spec: value: "%s" ` +const gatewayProxyWithServiceYaml = ` +apiVersion: apisix.apache.org/v1alpha1 +kind: GatewayProxy +metadata: + name: %s + namespace: %s +spec: + provider: + type: ControlPlane + controlPlane: + service: + name: %s + port: 9180 + auth: + type: AdminKey + adminKey: + value: "%s" +` + const ingressClassYaml = ` apiVersion: networking.k8s.io/v1 kind: IngressClass @@ -479,6 +498,10 @@ func (s *Scaffold) GetGatewayProxyYaml() string { return fmt.Sprintf(gatewayProxyYaml, s.namespace, s.namespace, s.Deployer.GetAdminEndpoint(), s.AdminKey()) } +func (s *Scaffold) GetGatewayProxyWithServiceYaml() string { + return fmt.Sprintf(gatewayProxyWithServiceYaml, s.namespace, s.namespace, s.dataplaneService.Name, s.AdminKey()) +} + func (s *Scaffold) GetIngressClassYaml() string { return fmt.Sprintf(ingressClassYaml, s.namespace, s.GetControllerName(), s.namespace, s.namespace) }