This is an automated email from the ASF dual-hosted git repository.

alinsran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c926e66 feat: support retry in case of sync failure (#2534)
7c926e66 is described below

commit 7c926e66b70256c5e132863ab0b3e2ec7fb4043d
Author: AlinsRan <alins...@apache.org>
AuthorDate: Tue Sep 2 08:59:31 2025 +0800

    feat: support retry in case of sync failure (#2534)
---
 internal/provider/apisix/provider.go      | 41 +++++++------
 internal/provider/common/retrier.go       | 96 +++++++++++++++++++++++++++++++
 test/e2e/crds/v2/route.go                 | 55 ++++++++++++++++++
 test/e2e/crds/v2/status.go                | 42 +++++++++++---
 test/e2e/framework/manifests/ingress.yaml |  2 +-
 test/e2e/scaffold/scaffold.go             | 23 ++++++++
 6 files changed, 233 insertions(+), 26 deletions(-)

diff --git a/internal/provider/apisix/provider.go 
b/internal/provider/apisix/provider.go
index 0246b3e7..f6a8f876 100644
--- a/internal/provider/apisix/provider.go
+++ b/internal/provider/apisix/provider.go
@@ -37,11 +37,19 @@ import (
        "github.com/apache/apisix-ingress-controller/internal/controller/status"
        "github.com/apache/apisix-ingress-controller/internal/manager/readiness"
        "github.com/apache/apisix-ingress-controller/internal/provider"
+       "github.com/apache/apisix-ingress-controller/internal/provider/common"
        "github.com/apache/apisix-ingress-controller/internal/types"
        "github.com/apache/apisix-ingress-controller/internal/utils"
 )
 
-const ProviderTypeAPISIX = "apisix"
+const (
+       ProviderTypeAPISIX = "apisix"
+
+       RetryBaseDelay = 1 * time.Second
+       RetryMaxDelay  = 1000 * time.Second
+
+       MinSyncPeriod = 1 * time.Second
+)
 
 type apisixProvider struct {
        provider.Options
@@ -223,33 +231,32 @@ func (d *apisixProvider) Start(ctx context.Context) error 
{
 
        initalSyncDelay := d.InitSyncDelay
        if initalSyncDelay > 0 {
-               time.AfterFunc(initalSyncDelay, func() {
-                       if err := d.sync(ctx); err != nil {
-                               log.Error(err)
-                               return
-                       }
-               })
+               time.AfterFunc(initalSyncDelay, d.syncNotify)
        }
 
-       if d.SyncPeriod < 1 {
-               return nil
+       syncPeriod := d.SyncPeriod
+       if syncPeriod < MinSyncPeriod {
+               syncPeriod = MinSyncPeriod
        }
-       ticker := time.NewTicker(d.SyncPeriod)
+       ticker := time.NewTicker(syncPeriod)
        defer ticker.Stop()
+
+       retrier := 
common.NewRetrier(common.NewExponentialBackoff(RetryBaseDelay, RetryMaxDelay))
+
        for {
-               synced := false
                select {
                case <-d.syncCh:
-                       synced = true
                case <-ticker.C:
-                       synced = true
+               case <-retrier.C():
                case <-ctx.Done():
+                       retrier.Reset()
                        return nil
                }
-               if synced {
-                       if err := d.sync(ctx); err != nil {
-                               log.Error(err)
-                       }
+               if err := d.sync(ctx); err != nil {
+                       log.Error(err)
+                       retrier.Next()
+               } else {
+                       retrier.Reset()
                }
        }
 }
diff --git a/internal/provider/common/retrier.go 
b/internal/provider/common/retrier.go
new file mode 100644
index 00000000..1277ee93
--- /dev/null
+++ b/internal/provider/common/retrier.go
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package common
+
+import (
+       "sync"
+       "time"
+)
+
+type Backoff interface {
+       Next() time.Duration
+       Reset()
+}
+
+type ExponentialBackoff struct {
+       base, max, current time.Duration
+}
+
+func NewExponentialBackoff(base, max time.Duration) *ExponentialBackoff {
+       return &ExponentialBackoff{base: base, max: max, current: base}
+}
+
+func (b *ExponentialBackoff) Next() time.Duration {
+       delay := b.current
+       b.current *= 2
+       if b.current > b.max {
+               b.current = b.max
+       }
+       return delay
+}
+
+func (b *ExponentialBackoff) Reset() {
+       b.current = b.base
+}
+
+type Retrier struct {
+       mu      sync.Mutex
+       ch      chan struct{}
+       timer   *time.Timer
+       backoff Backoff
+}
+
+func NewRetrier(b Backoff) *Retrier {
+       return &Retrier{
+               ch:      make(chan struct{}, 1),
+               backoff: b,
+       }
+}
+
+func (r *Retrier) Reset() {
+       r.mu.Lock()
+       defer r.mu.Unlock()
+
+       if r.timer != nil {
+               r.timer.Stop()
+               r.timer = nil
+       }
+       r.backoff.Reset()
+}
+
+func (r *Retrier) Next() {
+       r.mu.Lock()
+       defer r.mu.Unlock()
+
+       if r.timer != nil {
+               r.timer.Stop()
+               r.timer = nil
+       }
+
+       delay := r.backoff.Next()
+       r.timer = time.AfterFunc(delay, func() {
+               select {
+               case r.ch <- struct{}{}:
+               default:
+               }
+       })
+}
+
+func (r *Retrier) C() <-chan struct{} {
+       return r.ch
+}
diff --git a/test/e2e/crds/v2/route.go b/test/e2e/crds/v2/route.go
index 605a7179..ac1e47c7 100644
--- a/test/e2e/crds/v2/route.go
+++ b/test/e2e/crds/v2/route.go
@@ -1676,4 +1676,59 @@ spec:
                        })
                })
        })
+
+       Context("Exception Test", func() {
+               const apisixRouteSpec = `
+apiVersion: apisix.apache.org/v2
+kind: ApisixRoute
+metadata:
+  name: default
+spec:
+  ingressClassName: %s
+  http:
+  - name: rule0
+    match:
+      hosts:
+      - httpbin
+      paths:
+      - /*
+    backends:
+    - serviceName: httpbin-service-e2e-test
+      servicePort: 80
+`
+               It("try again when sync failed", func() {
+                       s.Deployer.ScaleDataplane(0)
+
+                       err := 
s.CreateResourceFromString(fmt.Sprintf(apisixRouteSpec, s.Namespace()))
+                       Expect(err).NotTo(HaveOccurred(), "creating 
ApisixRoute")
+
+                       By("check ApisixRoute status")
+                       s.RetryAssertion(func() string {
+                               output, _ := s.GetOutputFromString("ar", 
"default", "-o", "yaml", "-n", s.Namespace())
+                               return output
+                       }).WithTimeout(30 * time.Second).
+                               Should(
+                                       And(
+                                               ContainSubstring(`status: 
"False"`),
+                                               ContainSubstring(`reason: 
SyncFailed`),
+                                       ),
+                               )
+
+                       s.Deployer.ScaleDataplane(1)
+
+                       s.RetryAssertion(func() string {
+                               output, _ := s.GetOutputFromString("ar", 
"default", "-o", "yaml", "-n", s.Namespace())
+                               return output
+                       }).WithTimeout(60 * time.Second).
+                               Should(ContainSubstring(`status: "True"`))
+
+                       By("check route in APISIX")
+                       s.RequestAssert(&scaffold.RequestAssert{
+                               Method: "GET",
+                               Path:   "/get",
+                               Host:   "httpbin",
+                               Check:  scaffold.WithExpectedStatus(200),
+                       })
+               })
+       })
 })
diff --git a/test/e2e/crds/v2/status.go b/test/e2e/crds/v2/status.go
index 06170a6d..7b7defe6 100644
--- a/test/e2e/crds/v2/status.go
+++ b/test/e2e/crds/v2/status.go
@@ -24,6 +24,7 @@ import (
 
        . "github.com/onsi/ginkgo/v2"
        . "github.com/onsi/gomega"
+       corev1 "k8s.io/api/core/v1"
        "k8s.io/apimachinery/pkg/types"
        "sigs.k8s.io/yaml"
 
@@ -41,7 +42,7 @@ var _ = Describe("Test CRD Status", 
Label("apisix.apache.org", "v2", "apisixrout
        Context("Test ApisixRoute Sync Status", func() {
                BeforeEach(func() {
                        By("create GatewayProxy")
-                       gatewayProxy := s.GetGatewayProxyYaml()
+                       gatewayProxy := s.GetGatewayProxyWithServiceYaml()
                        err := s.CreateResourceFromString(gatewayProxy)
                        Expect(err).NotTo(HaveOccurred(), "creating 
GatewayProxy")
                        time.Sleep(5 * time.Second)
@@ -144,7 +145,8 @@ spec:
 
                It("dataplane unavailable", func() {
                        By("apply ApisixRoute")
-                       applier.MustApplyAPIv2(types.NamespacedName{Namespace: 
s.Namespace(), Name: "default"}, &apiv2.ApisixRoute{}, fmt.Sprintf(ar, 
s.Namespace(), s.Namespace()))
+                       arYaml := fmt.Sprintf(ar, s.Namespace(), s.Namespace())
+                       applier.MustApplyAPIv2(types.NamespacedName{Namespace: 
s.Namespace(), Name: "default"}, &apiv2.ApisixRoute{}, arYaml)
 
                        By("check route in APISIX")
                        s.RequestAssert(&scaffold.RequestAssert{
@@ -154,13 +156,28 @@ spec:
                                Check:   scaffold.WithExpectedStatus(200),
                        })
 
-                       s.Deployer.ScaleDataplane(0)
+                       By("get yaml from service")
+                       serviceYaml, err := s.GetOutputFromString("svc", 
framework.ProviderType, "-o", "yaml")
+                       Expect(err).NotTo(HaveOccurred(), "getting service 
yaml")
+                       By("update service to type ExternalName with invalid 
host")
+                       var k8sservice corev1.Service
+                       err = yaml.Unmarshal([]byte(serviceYaml), &k8sservice)
+                       Expect(err).NotTo(HaveOccurred(), "unmarshalling 
service")
+                       oldSpec := k8sservice.Spec
+                       k8sservice.Spec = corev1.ServiceSpec{
+                               Type:         corev1.ServiceTypeExternalName,
+                               ExternalName: "invalid.host",
+                       }
+                       newServiceYaml, err := yaml.Marshal(k8sservice)
+                       Expect(err).NotTo(HaveOccurred(), "marshalling service")
+                       err = s.CreateResourceFromString(string(newServiceYaml))
+                       Expect(err).NotTo(HaveOccurred(), "creating service")
 
                        By("check ApisixRoute status")
                        s.RetryAssertion(func() string {
-                               output, _ := s.GetOutputFromString("ar", 
"default", "-o", "yaml", "-n", s.Namespace())
+                               output, _ := s.GetOutputFromString("ar", 
"default", "-o", "yaml")
                                return output
-                       }).WithTimeout(80 * time.Second).
+                       }).WithTimeout(60 * time.Second).
                                Should(
                                        And(
                                                ContainSubstring(`status: 
"False"`),
@@ -168,13 +185,22 @@ spec:
                                        ),
                                )
 
-                       s.Deployer.ScaleDataplane(1)
+                       By("update service to original spec")
+                       serviceYaml, err = s.GetOutputFromString("svc", 
framework.ProviderType, "-o", "yaml")
+                       Expect(err).NotTo(HaveOccurred(), "getting service 
yaml")
+                       err = yaml.Unmarshal([]byte(serviceYaml), &k8sservice)
+                       Expect(err).NotTo(HaveOccurred(), "unmarshalling 
service")
+                       k8sservice.Spec = oldSpec
+                       newServiceYaml, err = yaml.Marshal(k8sservice)
+                       Expect(err).NotTo(HaveOccurred(), "marshalling service")
+                       err = s.CreateResourceFromString(string(newServiceYaml))
+                       Expect(err).NotTo(HaveOccurred(), "creating service")
 
                        By("check ApisixRoute status after scaling up")
                        s.RetryAssertion(func() string {
-                               output, _ := s.GetOutputFromString("ar", 
"default", "-o", "yaml", "-n", s.Namespace())
+                               output, _ := s.GetOutputFromString("ar", 
"default", "-o", "yaml")
                                return output
-                       }).WithTimeout(80 * time.Second).
+                       }).WithTimeout(60 * time.Second).
                                Should(
                                        And(
                                                ContainSubstring(`status: 
"True"`),
diff --git a/test/e2e/framework/manifests/ingress.yaml 
b/test/e2e/framework/manifests/ingress.yaml
index 1ea6a88f..c411a93d 100644
--- a/test/e2e/framework/manifests/ingress.yaml
+++ b/test/e2e/framework/manifests/ingress.yaml
@@ -333,7 +333,7 @@ data:
                                         # The period between two consecutive 
syncs.
                                         # The default value is 0 seconds, 
which means the controller will not sync.
                                         # If you want to enable the sync, set 
it to a positive value.
-      init_sync_delay: {{ .InitSyncDelay | default "1m" }}
+      init_sync_delay: {{ .InitSyncDelay | default "20m" }}
 ---
 apiVersion: v1
 kind: Service
diff --git a/test/e2e/scaffold/scaffold.go b/test/e2e/scaffold/scaffold.go
index 5d4509c2..9d8dfff1 100644
--- a/test/e2e/scaffold/scaffold.go
+++ b/test/e2e/scaffold/scaffold.go
@@ -460,6 +460,25 @@ spec:
           value: "%s"
 `
 
+const gatewayProxyWithServiceYaml = `
+apiVersion: apisix.apache.org/v1alpha1
+kind: GatewayProxy
+metadata:
+  name: %s
+  namespace: %s
+spec:
+  provider:
+    type: ControlPlane
+    controlPlane:
+      service:
+        name: %s
+        port: 9180
+      auth:
+        type: AdminKey
+        adminKey:
+          value: "%s"
+`
+
 const ingressClassYaml = `
 apiVersion: networking.k8s.io/v1
 kind: IngressClass
@@ -479,6 +498,10 @@ func (s *Scaffold) GetGatewayProxyYaml() string {
        return fmt.Sprintf(gatewayProxyYaml, s.namespace, s.namespace, 
s.Deployer.GetAdminEndpoint(), s.AdminKey())
 }
 
+func (s *Scaffold) GetGatewayProxyWithServiceYaml() string {
+       return fmt.Sprintf(gatewayProxyWithServiceYaml, s.namespace, 
s.namespace, s.dataplaneService.Name, s.AdminKey())
+}
+
 func (s *Scaffold) GetIngressClassYaml() string {
        return fmt.Sprintf(ingressClassYaml, s.namespace, 
s.GetControllerName(), s.namespace, s.namespace)
 }

Reply via email to