This is an automated email from the ASF dual-hosted git repository.

kvn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git


The following commit(s) were added to refs/heads/master by this push:
     new 09787ed  feat: upstream retries and timeout (#254)
09787ed is described below

commit 09787ed80507be7f2a7b97d3aefddd28cfcad4a6
Author: Alex Zhang <[email protected]>
AuthorDate: Thu Feb 18 17:14:20 2021 +0800

    feat: upstream retries and timeout (#254)
    
    * feat: upstream retries and timeout
    
    * doc: add usage and specification for retries and timeout
    
    * test: add e2e cases
    
    * fix: style
---
 docs/CRD-specification.md                          |  44 ++++++++
 pkg/apisix/resource.go                             |  14 +--
 pkg/apisix/upstream.go                             |  32 +++---
 pkg/kube/apisix/apis/config/v1/types.go            |  16 +++
 .../apisix/apis/config/v1/zz_generated.deepcopy.go |  26 +++++
 pkg/kube/apisix_upstream.go                        |  49 +++++++++
 pkg/kube/apisix_upstream_test.go                   |  36 +++++++
 pkg/kube/translator.go                             |   3 +
 pkg/types/apisix/v1/types.go                       |  15 +++
 pkg/types/apisix/v1/zz_generated.deepcopy.go       |   5 +
 test/e2e/features/retries_timeout.go               | 114 +++++++++++++++++++++
 11 files changed, 334 insertions(+), 20 deletions(-)

diff --git a/docs/CRD-specification.md b/docs/CRD-specification.md
index 9dffa0b..c63450a 100644
--- a/docs/CRD-specification.md
+++ b/docs/CRD-specification.md
@@ -27,6 +27,7 @@ In order to control the behavior of the proxy ([Apache 
APISIX](https://github.co
 - [ApisixUpstream](#apisixupstream)
   - [Configuring Load Balancer](#configuring-load-balancer)
   - [Configuring Health Check](#configuring-load-balancer)
+  - [Configuring Retry and Timeout](#configuring-retry-and-timeout)
   - [Port Level Settings](#port-level-settings)
   - [Configuration References](#configuration-references)
 - [ApisixTls](#apisixtls)
@@ -200,6 +201,44 @@ up once the healthy conditions are met (three consecutive 
requests got good stat
 Note the active health checker is somewhat duplicated with the 
liveness/readiness probes but it's required if the passive feedback mechanism 
is in use. So once you use the health check feature in ApisixUpstream,
 the active health checker is mandatory.
 
+### Configuring Retry and Timeout
+
+You may want the proxy to retry when requests occur faults like transient 
network errors
+or service unavailable, by default the retry count is `1`. You can change it 
by specifying the `retries` field.
+
+The following configuration configures the `retries` to `3`, which indicates 
there'll be at most `3` requests sent to
+Kubernetes service `httpbin`'s endpoints.
+
+One should bear in mind that passing a request to the next endpoint is only 
possible
+if nothing has been sent to a client yet. That is, if an error or timeout 
occurs in the middle
+of the transferring of a response, fixing this is impossible.
+
+```yaml
+apiVersion: apisix.apache.org/v1
+kind: ApisixUpstream
+metadata:
+  name: httpbin
+spec:
+  retries: 3
+```
+
+The default connect, read and send timeout are `60s`, which might not proper 
for some applicartions,
+just change them in the `timeout` field.
+
+```yaml
+apiVersion: apisix.apache.org/v1
+kind: ApisixUpstream
+metadata:
+  name: httpbin
+spec:
+  timeout:
+    connect: 5s
+    read: 10s
+    send: 10s
+```
+
+The above examples sets the connect, read and timeout to `5s`, `10s`, `10s` 
respectively.
+
 ### Port Level Settings
 
 Once in a while a single Kubernetes Service might expose multiple ports which 
provides distinct functions and different Upstream configurations are required.
@@ -250,6 +289,11 @@ In the meanwhile, the ApisixUpstream `foo` sets `http` 
scheme for port `7000` an
 | loadbalancer.type | string | The load balancing type, can be `roundrobin`, 
`ewma`, `least_conn`, `chash`, default is `roundrobin`. |
 | loadbalancer.hashOn | string | The hash value source scope, only take 
effects if the `chash` algorithm is in use. Values can `vars`, `header`, 
`vars_combinations`, `cookie` and `consumers`, default is `vars`. |
 | loadbalancer.key | string | The hash key, only in valid if the `chash` 
algorithm is used.
+| retries | int | The retry count. |
+| timeout | object | The timeout settings. |
+| timeout.connect | time duration in the form "72h3m0.5s" | The connect 
timeout. |
+| timeout.read | time duration in the form "72h3m0.5s" | The read timeout. |
+| timeout.send | time duration in the form "72h3m0.5s" | The send timeout. |
 | healthCheck | object | The health check parameters, see [Health 
Check](https://github.com/apache/apisix/blob/master/doc/health-check.md) for 
more details. |
 | healthCheck.active | object | active health check configuration, which is a 
mandatory field. |
 | healthCheck.active.type | string | health check type, can be `http`, `https` 
and `tcp`, default is `http`. |
diff --git a/pkg/apisix/resource.go b/pkg/apisix/resource.go
index aa95aad..b5f5f5b 100644
--- a/pkg/apisix/resource.go
+++ b/pkg/apisix/resource.go
@@ -155,12 +155,14 @@ func (i *item) upstream(clusterName string) 
(*v1.Upstream, error) {
                        Group:    clusterName,
                        Name:     ups.Desc,
                },
-               Type:   ups.LBType,
-               Key:    ups.Key,
-               HashOn: ups.HashOn,
-               Nodes:  nodes,
-               Scheme: ups.Scheme,
-               Checks: ups.Checks,
+               Type:    ups.LBType,
+               Key:     ups.Key,
+               HashOn:  ups.HashOn,
+               Nodes:   nodes,
+               Scheme:  ups.Scheme,
+               Checks:  ups.Checks,
+               Retries: ups.Retries,
+               Timeout: ups.Timeout,
        }, nil
 }
 
diff --git a/pkg/apisix/upstream.go b/pkg/apisix/upstream.go
index 137ab99..43f6c68 100644
--- a/pkg/apisix/upstream.go
+++ b/pkg/apisix/upstream.go
@@ -62,13 +62,15 @@ func (n *upstreamNodes) UnmarshalJSON(p []byte) error {
 }
 
 type upstreamReqBody struct {
-       LBType string                  `json:"type"`
-       HashOn string                  `json:"hash_on,omitempty"`
-       Key    string                  `json:"key,omitempty"`
-       Nodes  upstreamNodes           `json:"nodes"`
-       Desc   string                  `json:"desc"`
-       Scheme string                  `json:"scheme,omitempty"`
-       Checks *v1.UpstreamHealthCheck `json:"checks,omitempty"`
+       LBType  string                  `json:"type"`
+       HashOn  string                  `json:"hash_on,omitempty"`
+       Key     string                  `json:"key,omitempty"`
+       Nodes   upstreamNodes           `json:"nodes"`
+       Desc    string                  `json:"desc"`
+       Scheme  string                  `json:"scheme,omitempty"`
+       Retries int                     `json:"retries,omitempty"`
+       Timeout *v1.UpstreamTimeout     `json:"timeout,omitempty"`
+       Checks  *v1.UpstreamHealthCheck `json:"checks,omitempty"`
 }
 
 type upstreamItem upstreamReqBody
@@ -192,13 +194,15 @@ func (u *upstreamClient) Create(ctx context.Context, obj 
*v1.Upstream) (*v1.Upst
                })
        }
        body, err := json.Marshal(upstreamReqBody{
-               LBType: obj.Type,
-               HashOn: obj.HashOn,
-               Key:    obj.Key,
-               Nodes:  nodes,
-               Desc:   obj.Name,
-               Scheme: obj.Scheme,
-               Checks: obj.Checks,
+               LBType:  obj.Type,
+               HashOn:  obj.HashOn,
+               Key:     obj.Key,
+               Nodes:   nodes,
+               Desc:    obj.Name,
+               Scheme:  obj.Scheme,
+               Checks:  obj.Checks,
+               Retries: obj.Retries,
+               Timeout: obj.Timeout,
        })
        if err != nil {
                return nil, err
diff --git a/pkg/kube/apisix/apis/config/v1/types.go 
b/pkg/kube/apisix/apis/config/v1/types.go
index 87ba3d7..1edc2af 100644
--- a/pkg/kube/apisix/apis/config/v1/types.go
+++ b/pkg/kube/apisix/apis/config/v1/types.go
@@ -105,11 +105,27 @@ type ApisixUpstreamConfig struct {
        // +optional
        Scheme string `json:"scheme,omitempty" yaml:"scheme,omitempty"`
 
+       // How many times that the proxy (Apache APISIX) should do when
+       // errors occur (error, timeout or bad http status codes like 500, 502).
+       // +optional
+       Retries int `json:"retries,omitempty" yaml:"retries,omitempty"`
+
+       // Timeout settings for the read, send and connect to the upstream.
+       // +optional
+       Timeout *UpstreamTimeout `json:"timeout,omitempty" 
yaml:"timeout,omitempty"`
+
        // The health check configurtions for the upstream.
        // +optional
        HealthCheck *HealthCheck `json:"healthCheck,omitempty" 
yaml:"healthCheck,omitempty"`
 }
 
+// UpstreamTimeout is settings for the read, send and connect to the upstream.
+type UpstreamTimeout struct {
+       Connect metav1.Duration `json:"connect,omitempty" 
yaml:"connect,omitempty"`
+       Send    metav1.Duration `json:"send,omitempty" yaml:"send,omitempty"`
+       Read    metav1.Duration `json:"read,omitempty" yaml:"read,omitempty"`
+}
+
 // PortLevelSettings configures the ApisixUpstreamConfig for each individual 
port. It inherits
 // configurations from the outer level (the whole Kubernetes Service) and 
overrides some of
 // them if they are set on the port level.
diff --git a/pkg/kube/apisix/apis/config/v1/zz_generated.deepcopy.go 
b/pkg/kube/apisix/apis/config/v1/zz_generated.deepcopy.go
index bff68aa..955499b 100644
--- a/pkg/kube/apisix/apis/config/v1/zz_generated.deepcopy.go
+++ b/pkg/kube/apisix/apis/config/v1/zz_generated.deepcopy.go
@@ -64,6 +64,7 @@ func (in *ActiveHealthCheck) DeepCopy() *ActiveHealthCheck {
 func (in *ActiveHealthCheckHealthy) DeepCopyInto(out 
*ActiveHealthCheckHealthy) {
        *out = *in
        
in.PassiveHealthCheckHealthy.DeepCopyInto(&out.PassiveHealthCheckHealthy)
+       out.Interval = in.Interval
        return
 }
 
@@ -81,6 +82,7 @@ func (in *ActiveHealthCheckHealthy) DeepCopy() 
*ActiveHealthCheckHealthy {
 func (in *ActiveHealthCheckUnhealthy) DeepCopyInto(out 
*ActiveHealthCheckUnhealthy) {
        *out = *in
        
in.PassiveHealthCheckUnhealthy.DeepCopyInto(&out.PassiveHealthCheckUnhealthy)
+       out.Interval = in.Interval
        return
 }
 
@@ -322,6 +324,11 @@ func (in *ApisixUpstreamConfig) DeepCopyInto(out 
*ApisixUpstreamConfig) {
                *out = new(LoadBalancer)
                **out = **in
        }
+       if in.Timeout != nil {
+               in, out := &in.Timeout, &out.Timeout
+               *out = new(UpstreamTimeout)
+               **out = **in
+       }
        if in.HealthCheck != nil {
                in, out := &in.HealthCheck, &out.HealthCheck
                *out = new(HealthCheck)
@@ -621,3 +628,22 @@ func (in *Rule) DeepCopy() *Rule {
        in.DeepCopyInto(out)
        return out
 }
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, 
writing into out. in must be non-nil.
+func (in *UpstreamTimeout) DeepCopyInto(out *UpstreamTimeout) {
+       *out = *in
+       out.Connect = in.Connect
+       out.Send = in.Send
+       out.Read = in.Read
+       return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, 
creating a new UpstreamTimeout.
+func (in *UpstreamTimeout) DeepCopy() *UpstreamTimeout {
+       if in == nil {
+               return nil
+       }
+       out := new(UpstreamTimeout)
+       in.DeepCopyInto(out)
+       return out
+}
diff --git a/pkg/kube/apisix_upstream.go b/pkg/kube/apisix_upstream.go
index 8697136..f38a053 100644
--- a/pkg/kube/apisix_upstream.go
+++ b/pkg/kube/apisix_upstream.go
@@ -19,6 +19,55 @@ import (
        apisixv1 
"github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
 )
 
+func (t *translator) translateUpstreamRetriesAndTimeout(retries int, timeout 
*configv1.UpstreamTimeout, ups *apisixv1.Upstream) error {
+       if retries < 0 {
+               return &translateError{
+                       field:  "retries",
+                       reason: "invalid value",
+               }
+       }
+       ups.Retries = retries
+       if timeout == nil {
+               return nil
+       }
+
+       // Since the schema of timeout doesn't allow only configuring
+       // one or two items. Here we assign the default value first.
+       connTimeout := apisixv1.DefaultUpstreamTimeout
+       readTimeout := apisixv1.DefaultUpstreamTimeout
+       sendTimeout := apisixv1.DefaultUpstreamTimeout
+       if timeout.Connect.Duration < 0 {
+               return &translateError{
+                       field:  "timeout.connect",
+                       reason: "invalid value",
+               }
+       } else if timeout.Connect.Duration > 0 {
+               connTimeout = int(timeout.Connect.Seconds())
+       }
+       if timeout.Read.Duration < 0 {
+               return &translateError{
+                       field:  "timeout.read",
+                       reason: "invalid value",
+               }
+       } else if timeout.Read.Duration > 0 {
+               readTimeout = int(timeout.Read.Seconds())
+       }
+       if timeout.Send.Duration < 0 {
+               return &translateError{
+                       field:  "timeout.send",
+                       reason: "invalid value",
+               }
+       } else if timeout.Send.Duration > 0 {
+               sendTimeout = int(timeout.Send.Seconds())
+       }
+       ups.Timeout = &apisixv1.UpstreamTimeout{
+               Connect: connTimeout,
+               Send:    sendTimeout,
+               Read:    readTimeout,
+       }
+       return nil
+}
+
 func (t *translator) translateUpstreamScheme(scheme string, ups 
*apisixv1.Upstream) error {
        if scheme == "" {
                ups.Scheme = apisixv1.SchemeHTTP
diff --git a/pkg/kube/apisix_upstream_test.go b/pkg/kube/apisix_upstream_test.go
index db0344a..9815996 100644
--- a/pkg/kube/apisix_upstream_test.go
+++ b/pkg/kube/apisix_upstream_test.go
@@ -369,3 +369,39 @@ func TestTranslateUpstreamActiveHealthCheckUnusually(t 
*testing.T) {
                reason: "invalid value",
        })
 }
+
+func TestUpstreamRetriesAndTimeout(t *testing.T) {
+       tr := &translator{}
+       err := tr.translateUpstreamRetriesAndTimeout(-1, nil, nil)
+       assert.Equal(t, err, &translateError{
+               field:  "retries",
+               reason: "invalid value",
+       })
+
+       var ups apisixv1.Upstream
+       err = tr.translateUpstreamRetriesAndTimeout(3, nil, &ups)
+       assert.Nil(t, err)
+       assert.Equal(t, ups.Retries, 3)
+
+       timeout := &configv1.UpstreamTimeout{
+               Connect: metav1.Duration{Duration: time.Second},
+               Read:    metav1.Duration{Duration: -1},
+       }
+       err = tr.translateUpstreamRetriesAndTimeout(3, timeout, &ups)
+       assert.Equal(t, err, &translateError{
+               field:  "timeout.read",
+               reason: "invalid value",
+       })
+
+       timeout = &configv1.UpstreamTimeout{
+               Connect: metav1.Duration{Duration: time.Second},
+               Read:    metav1.Duration{Duration: 15 * time.Second},
+       }
+       err = tr.translateUpstreamRetriesAndTimeout(3, timeout, &ups)
+       assert.Nil(t, err)
+       assert.Equal(t, ups.Timeout, &apisixv1.UpstreamTimeout{
+               Connect: 1,
+               Send:    60,
+               Read:    15,
+       })
+}
diff --git a/pkg/kube/translator.go b/pkg/kube/translator.go
index 1e940b1..a8e5941 100644
--- a/pkg/kube/translator.go
+++ b/pkg/kube/translator.go
@@ -83,6 +83,9 @@ func (t *translator) TranslateUpstreamConfig(au 
*configv1.ApisixUpstreamConfig)
        if err := t.translateUpstreamHealthCheck(au.HealthCheck, ups); err != 
nil {
                return nil, err
        }
+       if err := t.translateUpstreamRetriesAndTimeout(au.Retries, au.Timeout, 
ups); err != nil {
+               return nil, err
+       }
        return ups, nil
 }
 
diff --git a/pkg/types/apisix/v1/types.go b/pkg/types/apisix/v1/types.go
index 1186fe5..c41a184 100644
--- a/pkg/types/apisix/v1/types.go
+++ b/pkg/types/apisix/v1/types.go
@@ -62,6 +62,9 @@ const (
        // ActiveHealthCheckMinInterval is the minimum interval for
        // the active health check.
        ActiveHealthCheckMinInterval = time.Second
+
+       // Default connect, read and send timeout (in seconds) with upstreams.
+       DefaultUpstreamTimeout = 60
 )
 
 // Metadata contains all meta information about resources.
@@ -130,6 +133,18 @@ type Upstream struct {
        Nodes    []UpstreamNode       `json:"nodes,omitempty" 
yaml:"nodes,omitempty"`
        FromKind string               `json:"from_kind,omitempty" 
yaml:"from_kind,omitempty"`
        Scheme   string               `json:"scheme,omitempty" 
yaml:"scheme,omitempty"`
+       Retries  int                  `json:"retries,omitempty" 
yaml:"retries,omitempty"`
+       Timeout  *UpstreamTimeout     `json:"timeout,omitempty" 
yaml:"timeout,omitempty"`
+}
+
+// UpstreamTimeout represents the timeout settings on Upstream.
+type UpstreamTimeout struct {
+       // Connect is the connect timeout
+       Connect int `json:"connect" yaml:"connect"`
+       // Send is the send timeout
+       Send int `json:"send" yaml:"send"`
+       // Read is the read timeout
+       Read int `json:"read" yaml:"read"`
 }
 
 // Node the node in upstream
diff --git a/pkg/types/apisix/v1/zz_generated.deepcopy.go 
b/pkg/types/apisix/v1/zz_generated.deepcopy.go
index 41b461c..51869da 100644
--- a/pkg/types/apisix/v1/zz_generated.deepcopy.go
+++ b/pkg/types/apisix/v1/zz_generated.deepcopy.go
@@ -95,6 +95,11 @@ func (in *Upstream) DeepCopyInto(out *Upstream) {
                *out = make([]UpstreamNode, len(*in))
                copy(*out, *in)
        }
+       if in.Timeout != nil {
+               in, out := &in.Timeout, &out.Timeout
+               *out = new(UpstreamTimeout)
+               **out = **in
+       }
        return
 }
 
diff --git a/test/e2e/features/retries_timeout.go 
b/test/e2e/features/retries_timeout.go
new file mode 100644
index 0000000..9e3d757
--- /dev/null
+++ b/test/e2e/features/retries_timeout.go
@@ -0,0 +1,114 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package features
+
+import (
+       "fmt"
+       "time"
+
+       "github.com/apache/apisix-ingress-controller/test/e2e/scaffold"
+       "github.com/onsi/ginkgo"
+       "github.com/stretchr/testify/assert"
+)
+
+var _ = ginkgo.Describe("retries", func() {
+       s := scaffold.NewDefaultScaffold()
+       ginkgo.It("active check", func() {
+               backendSvc, backendPorts := s.DefaultHTTPBackend()
+
+               au := fmt.Sprintf(`
+apiVersion: apisix.apache.org/v1
+kind: ApisixUpstream
+metadata:
+  name: %s
+spec:
+  retries: 3
+`, backendSvc)
+               err := s.CreateResourceFromString(au)
+               assert.Nil(ginkgo.GinkgoT(), err, "create ApisixUpstream")
+               time.Sleep(2 * time.Second)
+
+               ar := fmt.Sprintf(`
+apiVersion: apisix.apache.org/v1
+kind: ApisixRoute
+metadata:
+ name: httpbin-route
+spec:
+ rules:
+ - host: httpbin.org
+   http:
+     paths:
+     - backend:
+         serviceName: %s
+         servicePort: %d
+       path: /*
+`, backendSvc, backendPorts[0])
+               err = s.CreateResourceFromString(ar)
+               assert.Nil(ginkgo.GinkgoT(), err)
+               time.Sleep(5 * time.Second)
+
+               ups, err := s.ListApisixUpstreams()
+               assert.Nil(ginkgo.GinkgoT(), err)
+               assert.Len(ginkgo.GinkgoT(), ups, 1)
+               assert.Equal(ginkgo.GinkgoT(), ups[0].Retries, 3)
+       })
+})
+
+var _ = ginkgo.Describe("timeout", func() {
+       s := scaffold.NewDefaultScaffold()
+       ginkgo.It("active check", func() {
+               backendSvc, backendPorts := s.DefaultHTTPBackend()
+
+               au := fmt.Sprintf(`
+apiVersion: apisix.apache.org/v1
+kind: ApisixUpstream
+metadata:
+  name: %s
+spec:
+  timeout:
+    read: 10s
+    send: 10s
+`, backendSvc)
+               err := s.CreateResourceFromString(au)
+               assert.Nil(ginkgo.GinkgoT(), err, "create ApisixUpstream")
+               time.Sleep(2 * time.Second)
+
+               ar := fmt.Sprintf(`
+apiVersion: apisix.apache.org/v1
+kind: ApisixRoute
+metadata:
+ name: httpbin-route
+spec:
+ rules:
+ - host: httpbin.org
+   http:
+     paths:
+     - backend:
+         serviceName: %s
+         servicePort: %d
+       path: /*
+`, backendSvc, backendPorts[0])
+               err = s.CreateResourceFromString(ar)
+               assert.Nil(ginkgo.GinkgoT(), err)
+               time.Sleep(5 * time.Second)
+
+               ups, err := s.ListApisixUpstreams()
+               assert.Nil(ginkgo.GinkgoT(), err)
+               assert.Len(ginkgo.GinkgoT(), ups, 1)
+               assert.Equal(ginkgo.GinkgoT(), ups[0].Timeout.Connect, 60)
+               assert.Equal(ginkgo.GinkgoT(), ups[0].Timeout.Read, 10)
+               assert.Equal(ginkgo.GinkgoT(), ups[0].Timeout.Send, 10)
+       })
+})

Reply via email to