This is an automated email from the ASF dual-hosted git repository.
kvn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git
The following commit(s) were added to refs/heads/master by this push:
new 09787ed feat: upstream retries and timeout (#254)
09787ed is described below
commit 09787ed80507be7f2a7b97d3aefddd28cfcad4a6
Author: Alex Zhang <[email protected]>
AuthorDate: Thu Feb 18 17:14:20 2021 +0800
feat: upstream retries and timeout (#254)
* feat: upstream retries and timeout
* doc: add usage and specification for retries and timeout
* test: add e2e cases
* fix: style
---
docs/CRD-specification.md | 44 ++++++++
pkg/apisix/resource.go | 14 +--
pkg/apisix/upstream.go | 32 +++---
pkg/kube/apisix/apis/config/v1/types.go | 16 +++
.../apisix/apis/config/v1/zz_generated.deepcopy.go | 26 +++++
pkg/kube/apisix_upstream.go | 49 +++++++++
pkg/kube/apisix_upstream_test.go | 36 +++++++
pkg/kube/translator.go | 3 +
pkg/types/apisix/v1/types.go | 15 +++
pkg/types/apisix/v1/zz_generated.deepcopy.go | 5 +
test/e2e/features/retries_timeout.go | 114 +++++++++++++++++++++
11 files changed, 334 insertions(+), 20 deletions(-)
diff --git a/docs/CRD-specification.md b/docs/CRD-specification.md
index 9dffa0b..c63450a 100644
--- a/docs/CRD-specification.md
+++ b/docs/CRD-specification.md
@@ -27,6 +27,7 @@ In order to control the behavior of the proxy ([Apache
APISIX](https://github.co
- [ApisixUpstream](#apisixupstream)
- [Configuring Load Balancer](#configuring-load-balancer)
- [Configuring Health Check](#configuring-load-balancer)
+ - [Configuring Retry and Timeout](#configuring-retry-and-timeout)
- [Port Level Settings](#port-level-settings)
- [Configuration References](#configuration-references)
- [ApisixTls](#apisixtls)
@@ -200,6 +201,44 @@ up once the healthy conditions are met (three consecutive
requests got good stat
Note the active health checker is somewhat duplicated with the
liveness/readiness probes but it's required if the passive feedback mechanism
is in use. So once you use the health check feature in ApisixUpstream,
the active health checker is mandatory.
+### Configuring Retry and Timeout
+
+You may want the proxy to retry when requests occur faults like transient
network errors
+or service unavailable, by default the retry count is `1`. You can change it
by specifying the `retries` field.
+
+The following configuration configures the `retries` to `3`, which indicates
there'll be at most `3` requests sent to
+Kubernetes service `httpbin`'s endpoints.
+
+One should bear in mind that passing a request to the next endpoint is only
possible
+if nothing has been sent to a client yet. That is, if an error or timeout
occurs in the middle
+of the transferring of a response, fixing this is impossible.
+
+```yaml
+apiVersion: apisix.apache.org/v1
+kind: ApisixUpstream
+metadata:
+ name: httpbin
+spec:
+ retries: 3
+```
+
+The default connect, read and send timeout are `60s`, which might not proper
for some applicartions,
+just change them in the `timeout` field.
+
+```yaml
+apiVersion: apisix.apache.org/v1
+kind: ApisixUpstream
+metadata:
+ name: httpbin
+spec:
+ timeout:
+ connect: 5s
+ read: 10s
+ send: 10s
+```
+
+The above examples sets the connect, read and timeout to `5s`, `10s`, `10s`
respectively.
+
### Port Level Settings
Once in a while a single Kubernetes Service might expose multiple ports which
provides distinct functions and different Upstream configurations are required.
@@ -250,6 +289,11 @@ In the meanwhile, the ApisixUpstream `foo` sets `http`
scheme for port `7000` an
| loadbalancer.type | string | The load balancing type, can be `roundrobin`,
`ewma`, `least_conn`, `chash`, default is `roundrobin`. |
| loadbalancer.hashOn | string | The hash value source scope, only take
effects if the `chash` algorithm is in use. Values can `vars`, `header`,
`vars_combinations`, `cookie` and `consumers`, default is `vars`. |
| loadbalancer.key | string | The hash key, only in valid if the `chash`
algorithm is used.
+| retries | int | The retry count. |
+| timeout | object | The timeout settings. |
+| timeout.connect | time duration in the form "72h3m0.5s" | The connect
timeout. |
+| timeout.read | time duration in the form "72h3m0.5s" | The read timeout. |
+| timeout.send | time duration in the form "72h3m0.5s" | The send timeout. |
| healthCheck | object | The health check parameters, see [Health
Check](https://github.com/apache/apisix/blob/master/doc/health-check.md) for
more details. |
| healthCheck.active | object | active health check configuration, which is a
mandatory field. |
| healthCheck.active.type | string | health check type, can be `http`, `https`
and `tcp`, default is `http`. |
diff --git a/pkg/apisix/resource.go b/pkg/apisix/resource.go
index aa95aad..b5f5f5b 100644
--- a/pkg/apisix/resource.go
+++ b/pkg/apisix/resource.go
@@ -155,12 +155,14 @@ func (i *item) upstream(clusterName string)
(*v1.Upstream, error) {
Group: clusterName,
Name: ups.Desc,
},
- Type: ups.LBType,
- Key: ups.Key,
- HashOn: ups.HashOn,
- Nodes: nodes,
- Scheme: ups.Scheme,
- Checks: ups.Checks,
+ Type: ups.LBType,
+ Key: ups.Key,
+ HashOn: ups.HashOn,
+ Nodes: nodes,
+ Scheme: ups.Scheme,
+ Checks: ups.Checks,
+ Retries: ups.Retries,
+ Timeout: ups.Timeout,
}, nil
}
diff --git a/pkg/apisix/upstream.go b/pkg/apisix/upstream.go
index 137ab99..43f6c68 100644
--- a/pkg/apisix/upstream.go
+++ b/pkg/apisix/upstream.go
@@ -62,13 +62,15 @@ func (n *upstreamNodes) UnmarshalJSON(p []byte) error {
}
type upstreamReqBody struct {
- LBType string `json:"type"`
- HashOn string `json:"hash_on,omitempty"`
- Key string `json:"key,omitempty"`
- Nodes upstreamNodes `json:"nodes"`
- Desc string `json:"desc"`
- Scheme string `json:"scheme,omitempty"`
- Checks *v1.UpstreamHealthCheck `json:"checks,omitempty"`
+ LBType string `json:"type"`
+ HashOn string `json:"hash_on,omitempty"`
+ Key string `json:"key,omitempty"`
+ Nodes upstreamNodes `json:"nodes"`
+ Desc string `json:"desc"`
+ Scheme string `json:"scheme,omitempty"`
+ Retries int `json:"retries,omitempty"`
+ Timeout *v1.UpstreamTimeout `json:"timeout,omitempty"`
+ Checks *v1.UpstreamHealthCheck `json:"checks,omitempty"`
}
type upstreamItem upstreamReqBody
@@ -192,13 +194,15 @@ func (u *upstreamClient) Create(ctx context.Context, obj
*v1.Upstream) (*v1.Upst
})
}
body, err := json.Marshal(upstreamReqBody{
- LBType: obj.Type,
- HashOn: obj.HashOn,
- Key: obj.Key,
- Nodes: nodes,
- Desc: obj.Name,
- Scheme: obj.Scheme,
- Checks: obj.Checks,
+ LBType: obj.Type,
+ HashOn: obj.HashOn,
+ Key: obj.Key,
+ Nodes: nodes,
+ Desc: obj.Name,
+ Scheme: obj.Scheme,
+ Checks: obj.Checks,
+ Retries: obj.Retries,
+ Timeout: obj.Timeout,
})
if err != nil {
return nil, err
diff --git a/pkg/kube/apisix/apis/config/v1/types.go
b/pkg/kube/apisix/apis/config/v1/types.go
index 87ba3d7..1edc2af 100644
--- a/pkg/kube/apisix/apis/config/v1/types.go
+++ b/pkg/kube/apisix/apis/config/v1/types.go
@@ -105,11 +105,27 @@ type ApisixUpstreamConfig struct {
// +optional
Scheme string `json:"scheme,omitempty" yaml:"scheme,omitempty"`
+ // How many times that the proxy (Apache APISIX) should do when
+ // errors occur (error, timeout or bad http status codes like 500, 502).
+ // +optional
+ Retries int `json:"retries,omitempty" yaml:"retries,omitempty"`
+
+ // Timeout settings for the read, send and connect to the upstream.
+ // +optional
+ Timeout *UpstreamTimeout `json:"timeout,omitempty"
yaml:"timeout,omitempty"`
+
// The health check configurtions for the upstream.
// +optional
HealthCheck *HealthCheck `json:"healthCheck,omitempty"
yaml:"healthCheck,omitempty"`
}
+// UpstreamTimeout is settings for the read, send and connect to the upstream.
+type UpstreamTimeout struct {
+ Connect metav1.Duration `json:"connect,omitempty"
yaml:"connect,omitempty"`
+ Send metav1.Duration `json:"send,omitempty" yaml:"send,omitempty"`
+ Read metav1.Duration `json:"read,omitempty" yaml:"read,omitempty"`
+}
+
// PortLevelSettings configures the ApisixUpstreamConfig for each individual
port. It inherits
// configurations from the outer level (the whole Kubernetes Service) and
overrides some of
// them if they are set on the port level.
diff --git a/pkg/kube/apisix/apis/config/v1/zz_generated.deepcopy.go
b/pkg/kube/apisix/apis/config/v1/zz_generated.deepcopy.go
index bff68aa..955499b 100644
--- a/pkg/kube/apisix/apis/config/v1/zz_generated.deepcopy.go
+++ b/pkg/kube/apisix/apis/config/v1/zz_generated.deepcopy.go
@@ -64,6 +64,7 @@ func (in *ActiveHealthCheck) DeepCopy() *ActiveHealthCheck {
func (in *ActiveHealthCheckHealthy) DeepCopyInto(out
*ActiveHealthCheckHealthy) {
*out = *in
in.PassiveHealthCheckHealthy.DeepCopyInto(&out.PassiveHealthCheckHealthy)
+ out.Interval = in.Interval
return
}
@@ -81,6 +82,7 @@ func (in *ActiveHealthCheckHealthy) DeepCopy()
*ActiveHealthCheckHealthy {
func (in *ActiveHealthCheckUnhealthy) DeepCopyInto(out
*ActiveHealthCheckUnhealthy) {
*out = *in
in.PassiveHealthCheckUnhealthy.DeepCopyInto(&out.PassiveHealthCheckUnhealthy)
+ out.Interval = in.Interval
return
}
@@ -322,6 +324,11 @@ func (in *ApisixUpstreamConfig) DeepCopyInto(out
*ApisixUpstreamConfig) {
*out = new(LoadBalancer)
**out = **in
}
+ if in.Timeout != nil {
+ in, out := &in.Timeout, &out.Timeout
+ *out = new(UpstreamTimeout)
+ **out = **in
+ }
if in.HealthCheck != nil {
in, out := &in.HealthCheck, &out.HealthCheck
*out = new(HealthCheck)
@@ -621,3 +628,22 @@ func (in *Rule) DeepCopy() *Rule {
in.DeepCopyInto(out)
return out
}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver,
writing into out. in must be non-nil.
+func (in *UpstreamTimeout) DeepCopyInto(out *UpstreamTimeout) {
+ *out = *in
+ out.Connect = in.Connect
+ out.Send = in.Send
+ out.Read = in.Read
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver,
creating a new UpstreamTimeout.
+func (in *UpstreamTimeout) DeepCopy() *UpstreamTimeout {
+ if in == nil {
+ return nil
+ }
+ out := new(UpstreamTimeout)
+ in.DeepCopyInto(out)
+ return out
+}
diff --git a/pkg/kube/apisix_upstream.go b/pkg/kube/apisix_upstream.go
index 8697136..f38a053 100644
--- a/pkg/kube/apisix_upstream.go
+++ b/pkg/kube/apisix_upstream.go
@@ -19,6 +19,55 @@ import (
apisixv1
"github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)
+func (t *translator) translateUpstreamRetriesAndTimeout(retries int, timeout
*configv1.UpstreamTimeout, ups *apisixv1.Upstream) error {
+ if retries < 0 {
+ return &translateError{
+ field: "retries",
+ reason: "invalid value",
+ }
+ }
+ ups.Retries = retries
+ if timeout == nil {
+ return nil
+ }
+
+ // Since the schema of timeout doesn't allow only configuring
+ // one or two items. Here we assign the default value first.
+ connTimeout := apisixv1.DefaultUpstreamTimeout
+ readTimeout := apisixv1.DefaultUpstreamTimeout
+ sendTimeout := apisixv1.DefaultUpstreamTimeout
+ if timeout.Connect.Duration < 0 {
+ return &translateError{
+ field: "timeout.connect",
+ reason: "invalid value",
+ }
+ } else if timeout.Connect.Duration > 0 {
+ connTimeout = int(timeout.Connect.Seconds())
+ }
+ if timeout.Read.Duration < 0 {
+ return &translateError{
+ field: "timeout.read",
+ reason: "invalid value",
+ }
+ } else if timeout.Read.Duration > 0 {
+ readTimeout = int(timeout.Read.Seconds())
+ }
+ if timeout.Send.Duration < 0 {
+ return &translateError{
+ field: "timeout.send",
+ reason: "invalid value",
+ }
+ } else if timeout.Send.Duration > 0 {
+ sendTimeout = int(timeout.Send.Seconds())
+ }
+ ups.Timeout = &apisixv1.UpstreamTimeout{
+ Connect: connTimeout,
+ Send: sendTimeout,
+ Read: readTimeout,
+ }
+ return nil
+}
+
func (t *translator) translateUpstreamScheme(scheme string, ups
*apisixv1.Upstream) error {
if scheme == "" {
ups.Scheme = apisixv1.SchemeHTTP
diff --git a/pkg/kube/apisix_upstream_test.go b/pkg/kube/apisix_upstream_test.go
index db0344a..9815996 100644
--- a/pkg/kube/apisix_upstream_test.go
+++ b/pkg/kube/apisix_upstream_test.go
@@ -369,3 +369,39 @@ func TestTranslateUpstreamActiveHealthCheckUnusually(t
*testing.T) {
reason: "invalid value",
})
}
+
+func TestUpstreamRetriesAndTimeout(t *testing.T) {
+ tr := &translator{}
+ err := tr.translateUpstreamRetriesAndTimeout(-1, nil, nil)
+ assert.Equal(t, err, &translateError{
+ field: "retries",
+ reason: "invalid value",
+ })
+
+ var ups apisixv1.Upstream
+ err = tr.translateUpstreamRetriesAndTimeout(3, nil, &ups)
+ assert.Nil(t, err)
+ assert.Equal(t, ups.Retries, 3)
+
+ timeout := &configv1.UpstreamTimeout{
+ Connect: metav1.Duration{Duration: time.Second},
+ Read: metav1.Duration{Duration: -1},
+ }
+ err = tr.translateUpstreamRetriesAndTimeout(3, timeout, &ups)
+ assert.Equal(t, err, &translateError{
+ field: "timeout.read",
+ reason: "invalid value",
+ })
+
+ timeout = &configv1.UpstreamTimeout{
+ Connect: metav1.Duration{Duration: time.Second},
+ Read: metav1.Duration{Duration: 15 * time.Second},
+ }
+ err = tr.translateUpstreamRetriesAndTimeout(3, timeout, &ups)
+ assert.Nil(t, err)
+ assert.Equal(t, ups.Timeout, &apisixv1.UpstreamTimeout{
+ Connect: 1,
+ Send: 60,
+ Read: 15,
+ })
+}
diff --git a/pkg/kube/translator.go b/pkg/kube/translator.go
index 1e940b1..a8e5941 100644
--- a/pkg/kube/translator.go
+++ b/pkg/kube/translator.go
@@ -83,6 +83,9 @@ func (t *translator) TranslateUpstreamConfig(au
*configv1.ApisixUpstreamConfig)
if err := t.translateUpstreamHealthCheck(au.HealthCheck, ups); err !=
nil {
return nil, err
}
+ if err := t.translateUpstreamRetriesAndTimeout(au.Retries, au.Timeout,
ups); err != nil {
+ return nil, err
+ }
return ups, nil
}
diff --git a/pkg/types/apisix/v1/types.go b/pkg/types/apisix/v1/types.go
index 1186fe5..c41a184 100644
--- a/pkg/types/apisix/v1/types.go
+++ b/pkg/types/apisix/v1/types.go
@@ -62,6 +62,9 @@ const (
// ActiveHealthCheckMinInterval is the minimum interval for
// the active health check.
ActiveHealthCheckMinInterval = time.Second
+
+ // Default connect, read and send timeout (in seconds) with upstreams.
+ DefaultUpstreamTimeout = 60
)
// Metadata contains all meta information about resources.
@@ -130,6 +133,18 @@ type Upstream struct {
Nodes []UpstreamNode `json:"nodes,omitempty"
yaml:"nodes,omitempty"`
FromKind string `json:"from_kind,omitempty"
yaml:"from_kind,omitempty"`
Scheme string `json:"scheme,omitempty"
yaml:"scheme,omitempty"`
+ Retries int `json:"retries,omitempty"
yaml:"retries,omitempty"`
+ Timeout *UpstreamTimeout `json:"timeout,omitempty"
yaml:"timeout,omitempty"`
+}
+
+// UpstreamTimeout represents the timeout settings on Upstream.
+type UpstreamTimeout struct {
+ // Connect is the connect timeout
+ Connect int `json:"connect" yaml:"connect"`
+ // Send is the send timeout
+ Send int `json:"send" yaml:"send"`
+ // Read is the read timeout
+ Read int `json:"read" yaml:"read"`
}
// Node the node in upstream
diff --git a/pkg/types/apisix/v1/zz_generated.deepcopy.go
b/pkg/types/apisix/v1/zz_generated.deepcopy.go
index 41b461c..51869da 100644
--- a/pkg/types/apisix/v1/zz_generated.deepcopy.go
+++ b/pkg/types/apisix/v1/zz_generated.deepcopy.go
@@ -95,6 +95,11 @@ func (in *Upstream) DeepCopyInto(out *Upstream) {
*out = make([]UpstreamNode, len(*in))
copy(*out, *in)
}
+ if in.Timeout != nil {
+ in, out := &in.Timeout, &out.Timeout
+ *out = new(UpstreamTimeout)
+ **out = **in
+ }
return
}
diff --git a/test/e2e/features/retries_timeout.go
b/test/e2e/features/retries_timeout.go
new file mode 100644
index 0000000..9e3d757
--- /dev/null
+++ b/test/e2e/features/retries_timeout.go
@@ -0,0 +1,114 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package features
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/apache/apisix-ingress-controller/test/e2e/scaffold"
+ "github.com/onsi/ginkgo"
+ "github.com/stretchr/testify/assert"
+)
+
+var _ = ginkgo.Describe("retries", func() {
+ s := scaffold.NewDefaultScaffold()
+ ginkgo.It("active check", func() {
+ backendSvc, backendPorts := s.DefaultHTTPBackend()
+
+ au := fmt.Sprintf(`
+apiVersion: apisix.apache.org/v1
+kind: ApisixUpstream
+metadata:
+ name: %s
+spec:
+ retries: 3
+`, backendSvc)
+ err := s.CreateResourceFromString(au)
+ assert.Nil(ginkgo.GinkgoT(), err, "create ApisixUpstream")
+ time.Sleep(2 * time.Second)
+
+ ar := fmt.Sprintf(`
+apiVersion: apisix.apache.org/v1
+kind: ApisixRoute
+metadata:
+ name: httpbin-route
+spec:
+ rules:
+ - host: httpbin.org
+ http:
+ paths:
+ - backend:
+ serviceName: %s
+ servicePort: %d
+ path: /*
+`, backendSvc, backendPorts[0])
+ err = s.CreateResourceFromString(ar)
+ assert.Nil(ginkgo.GinkgoT(), err)
+ time.Sleep(5 * time.Second)
+
+ ups, err := s.ListApisixUpstreams()
+ assert.Nil(ginkgo.GinkgoT(), err)
+ assert.Len(ginkgo.GinkgoT(), ups, 1)
+ assert.Equal(ginkgo.GinkgoT(), ups[0].Retries, 3)
+ })
+})
+
+var _ = ginkgo.Describe("timeout", func() {
+ s := scaffold.NewDefaultScaffold()
+ ginkgo.It("active check", func() {
+ backendSvc, backendPorts := s.DefaultHTTPBackend()
+
+ au := fmt.Sprintf(`
+apiVersion: apisix.apache.org/v1
+kind: ApisixUpstream
+metadata:
+ name: %s
+spec:
+ timeout:
+ read: 10s
+ send: 10s
+`, backendSvc)
+ err := s.CreateResourceFromString(au)
+ assert.Nil(ginkgo.GinkgoT(), err, "create ApisixUpstream")
+ time.Sleep(2 * time.Second)
+
+ ar := fmt.Sprintf(`
+apiVersion: apisix.apache.org/v1
+kind: ApisixRoute
+metadata:
+ name: httpbin-route
+spec:
+ rules:
+ - host: httpbin.org
+ http:
+ paths:
+ - backend:
+ serviceName: %s
+ servicePort: %d
+ path: /*
+`, backendSvc, backendPorts[0])
+ err = s.CreateResourceFromString(ar)
+ assert.Nil(ginkgo.GinkgoT(), err)
+ time.Sleep(5 * time.Second)
+
+ ups, err := s.ListApisixUpstreams()
+ assert.Nil(ginkgo.GinkgoT(), err)
+ assert.Len(ginkgo.GinkgoT(), ups, 1)
+ assert.Equal(ginkgo.GinkgoT(), ups[0].Timeout.Connect, 60)
+ assert.Equal(ginkgo.GinkgoT(), ups[0].Timeout.Read, 10)
+ assert.Equal(ginkgo.GinkgoT(), ups[0].Timeout.Send, 10)
+ })
+})