This is an automated email from the ASF dual-hosted git repository.
zhangjintao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git
The following commit(s) were added to refs/heads/master by this push:
new c8d3bd52 feat: add support for integrate with DP service discovery
(#1465)
c8d3bd52 is described below
commit c8d3bd52fd8820475be763cd52b51b981382f285
Author: Jintao Zhang <[email protected]>
AuthorDate: Fri Nov 25 17:33:14 2022 +0800
feat: add support for integrate with DP service discovery (#1465)
The APISIX data plane has provided the ability to integrate with service
discovery components.
There are two ways to integrate APISIX Ingress with service discovery
components:
* Data plane implementation:
* Configure service discovery and other related components on the APISIX
data plane.
* APISIX Ingress only considers some configurations from service
discovery components when processing some configurations/translations for
compatibility.
* Control plane implementation:
* Connect the service discovery component through the APISIX Ingress
controller, and push the upstream data obtained from the service discovery
component together with other configuration items to the data plane for
configuration.
* NOTE: This method is similar to apisix-seed project.
* This mode is relatively simple and unified, and the required functions
can be realized without data plane-aware service discovery components.
The solution we are going to implement this time is the first one, which is
implemented on the data plane.
---
pkg/apisix/cluster.go | 4 +
pkg/apisix/upstreamservicerelation.go | 4 +
pkg/kube/apisix/apis/config/v2/types.go | 11 +
.../apisix/apis/config/v2/zz_generated.deepcopy.go | 28 ++
pkg/providers/apisix/apisix_upstream.go | 135 +++++-----
pkg/providers/apisix/translation/apisix_route.go | 2 +-
.../apisix/translation/apisix_upstream.go | 18 +-
pkg/providers/k8s/endpoint/endpointslice.go | 2 +-
pkg/providers/translation/apisix_upstream.go | 14 +
pkg/providers/types/types.go | 4 +
pkg/types/apisix/v1/types.go | 82 ++++++
pkg/types/apisix/v1/zz_generated.deepcopy.go | 7 +
samples/deploy/crd/v1/ApisixUpstream.yaml | 11 +
test/e2e/suite-features/external-sd.go | 286 +++++++++++++++++++++
test/e2e/testdata/apisix-gw-config-v3-with-sd.yaml | 58 +++++
test/e2e/testdata/apisix-gw-config-with-sd.yaml | 51 ++++
16 files changed, 645 insertions(+), 72 deletions(-)
diff --git a/pkg/apisix/cluster.go b/pkg/apisix/cluster.go
index 7b0914a6..4aefc124 100644
--- a/pkg/apisix/cluster.go
+++ b/pkg/apisix/cluster.go
@@ -714,6 +714,10 @@ func (c *cluster) updateResource(ctx context.Context, url,
resource string, body
if resp.StatusCode != http.StatusOK && resp.StatusCode !=
http.StatusCreated {
body := readBody(resp.Body, url)
+ log.Debugw("update response",
+ zap.Int("status code %d", resp.StatusCode),
+ zap.String("body %s", body),
+ )
if c.isFunctionDisabled(body) {
return nil, ErrFunctionDisabled
}
diff --git a/pkg/apisix/upstreamservicerelation.go
b/pkg/apisix/upstreamservicerelation.go
index b7a00779..64a5e03b 100644
--- a/pkg/apisix/upstreamservicerelation.go
+++ b/pkg/apisix/upstreamservicerelation.go
@@ -75,8 +75,12 @@ func (u *upstreamService) Delete(ctx context.Context,
serviceName string) error
continue
}
ups.Nodes = make(v1.UpstreamNodes, 0)
+ log.Debugw("try to update upstream in cluster",
+ zap.Any("upstream", ups),
+ )
_, err = u.cluster.upstream.Update(ctx, ups)
if err != nil {
+ log.Error(err)
continue
}
}
diff --git a/pkg/kube/apisix/apis/config/v2/types.go
b/pkg/kube/apisix/apis/config/v2/types.go
index 0dfa089c..da481997 100644
--- a/pkg/kube/apisix/apis/config/v2/types.go
+++ b/pkg/kube/apisix/apis/config/v2/types.go
@@ -492,6 +492,10 @@ type ApisixUpstreamConfig struct {
// service versions.
// +optional
Subsets []ApisixUpstreamSubset `json:"subsets,omitempty"
yaml:"subsets,omitempty"`
+
+ // Discovery is used to configure service discovery for upstream.
+ // +optional
+ Discovery *Discovery `json:"discovery,omitempty"
yaml:"discovery,omitempty"`
}
// ApisixUpstreamExternalType is the external service type
@@ -603,6 +607,13 @@ type PassiveHealthCheckUnhealthy struct {
Timeouts int `json:"timeout,omitempty" yaml:"timeout,omitempty"`
}
+// Discovery defines Service discovery related configuration.
+type Discovery struct {
+ ServiceName string `json:"serviceName" yaml:"serviceName"`
+ Type string `json:"type" yaml:"type"`
+ Args map[string]string `json:"args,omitempty"
yaml:"args,omitempty"`
+}
+
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type ApisixUpstreamList struct {
metav1.TypeMeta `json:",inline" yaml:",inline"`
diff --git a/pkg/kube/apisix/apis/config/v2/zz_generated.deepcopy.go
b/pkg/kube/apisix/apis/config/v2/zz_generated.deepcopy.go
index 339a15f3..b5892305 100644
--- a/pkg/kube/apisix/apis/config/v2/zz_generated.deepcopy.go
+++ b/pkg/kube/apisix/apis/config/v2/zz_generated.deepcopy.go
@@ -1274,6 +1274,11 @@ func (in *ApisixUpstreamConfig) DeepCopyInto(out
*ApisixUpstreamConfig) {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
+ if in.Discovery != nil {
+ in, out := &in.Discovery, &out.Discovery
+ *out = new(Discovery)
+ (*in).DeepCopyInto(*out)
+ }
return
}
@@ -1411,6 +1416,29 @@ func (in *ApisixUpstreamSubset) DeepCopy()
*ApisixUpstreamSubset {
return out
}
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver,
writing into out. in must be non-nil.
+func (in *Discovery) DeepCopyInto(out *Discovery) {
+ *out = *in
+ if in.Args != nil {
+ in, out := &in.Args, &out.Args
+ *out = make(map[string]string, len(*in))
+ for key, val := range *in {
+ (*out)[key] = val
+ }
+ }
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver,
creating a new Discovery.
+func (in *Discovery) DeepCopy() *Discovery {
+ if in == nil {
+ return nil
+ }
+ out := new(Discovery)
+ in.DeepCopyInto(out)
+ return out
+}
+
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver,
writing into out. in must be non-nil.
func (in *HealthCheck) DeepCopyInto(out *HealthCheck) {
*out = *in
diff --git a/pkg/providers/apisix/apisix_upstream.go
b/pkg/providers/apisix/apisix_upstream.go
index e3992abc..0db5ae44 100644
--- a/pkg/providers/apisix/apisix_upstream.go
+++ b/pkg/providers/apisix/apisix_upstream.go
@@ -283,7 +283,8 @@ func (c *apisixUpstreamController) sync(ctx
context.Context, ev *types.Event) er
return nil
}
- if len(au.Spec.ExternalNodes) != 0 {
+ // We will prioritize ExternalNodes and Discovery.
+ if len(au.Spec.ExternalNodes) != 0 || au.Spec.Discovery != nil {
var newUps *apisixv1.Upstream
if ev.Type != types.EventDelete {
cfg := &au.Spec.ApisixUpstreamConfig
@@ -299,12 +300,19 @@ func (c *apisixUpstreamController) sync(ctx
context.Context, ev *types.Event) er
}
}
- err := c.updateExternalNodes(ctx, au, nil, newUps)
- if err != nil {
- return err
+ if len(au.Spec.ExternalNodes) != 0 {
+ return c.updateExternalNodes(ctx, au, nil,
newUps, au.Namespace, au.Name)
}
- return nil
+ // for service discovery related configuration
+ if au.Spec.Discovery.ServiceName == "" ||
au.Spec.Discovery.Type == "" {
+ log.Error("If you setup Discovery for
ApisixUpstream, you need to specify the ServiceName and Type fields.")
+ return fmt.Errorf("No ServiceName or Type
fields found")
+ }
+ // updateUpstream for real
+ upsName :=
apisixv1.ComposeExternalUpstreamName(au.Namespace, au.Name)
+ return c.updateUpstream(ctx, upsName,
&au.Spec.ApisixUpstreamConfig)
+
}
var portLevelSettings map[int32]configv2.ApisixUpstreamConfig
@@ -328,69 +336,27 @@ func (c *apisixUpstreamController) sync(ctx
context.Context, ev *types.Event) er
if len(au.Spec.Subsets) > 0 {
subsets = append(subsets, au.Spec.Subsets...)
}
- clusterName := c.Config.APISIX.DefaultClusterName
for _, port := range svc.Spec.Ports {
for _, subset := range subsets {
- // TODO: multiple cluster
- update := func(upsName string) error {
- ups, err :=
c.APISIX.Cluster(clusterName).Upstream().Get(ctx, upsName)
- if err != nil {
- if err ==
apisixcache.ErrNotFound {
- return nil
- }
- log.Errorf("failed to get
upstream %s: %s", upsName, err)
- c.RecordEvent(au,
corev1.EventTypeWarning, utils.ResourceSyncAborted, err)
- c.recordStatus(au,
utils.ResourceSyncAborted, err, metav1.ConditionFalse, au.GetGeneration())
- return err
- }
- var newUps *apisixv1.Upstream
- if ev.Type != types.EventDelete {
- cfg, ok :=
portLevelSettings[port.Port]
- if !ok {
- cfg =
au.Spec.ApisixUpstreamConfig
- }
- // FIXME Same
ApisixUpstreamConfig might be translated multiple times.
- newUps, err =
c.translator.TranslateUpstreamConfigV2(&cfg)
- if err != nil {
-
log.Errorw("ApisixUpstream conversion cannot be completed, or the format is
incorrect",
-
zap.Any("object", au),
- zap.Error(err),
- )
- c.RecordEvent(au,
corev1.EventTypeWarning, utils.ResourceSyncAborted, err)
- c.recordStatus(au,
utils.ResourceSyncAborted, err, metav1.ConditionFalse, au.GetGeneration())
- return err
- }
- } else {
- newUps =
apisixv1.NewDefaultUpstream()
- }
-
- newUps.Metadata = ups.Metadata
- newUps.Nodes = ups.Nodes
- log.Debugw("updating upstream since
ApisixUpstream changed",
- zap.String("event",
ev.Type.String()),
- zap.Any("upstream", newUps),
- zap.Any("ApisixUpstream", au),
- )
- if _, err :=
c.APISIX.Cluster(clusterName).Upstream().Update(ctx, newUps); err != nil {
- log.Errorw("failed to update
upstream",
- zap.Error(err),
- zap.Any("upstream",
newUps),
-
zap.Any("ApisixUpstream", au),
- zap.String("cluster",
clusterName),
- )
- c.RecordEvent(au,
corev1.EventTypeWarning, utils.ResourceSyncAborted, err)
- c.recordStatus(au,
utils.ResourceSyncAborted, err, metav1.ConditionFalse, au.GetGeneration())
- return err
+ var cfg configv2.ApisixUpstreamConfig
+ if ev.Type != types.EventDelete {
+ var ok bool
+ cfg, ok = portLevelSettings[port.Port]
+ if !ok {
+ cfg =
au.Spec.ApisixUpstreamConfig
}
- return nil
}
- err :=
update(apisixv1.ComposeUpstreamName(namespace, name, subset.Name, port.Port,
types.ResolveGranularity.Endpoint))
+ err := c.updateUpstream(ctx,
apisixv1.ComposeUpstreamName(namespace, name, subset.Name, port.Port,
types.ResolveGranularity.Endpoint), &cfg)
if err != nil {
+ c.RecordEvent(au,
corev1.EventTypeWarning, utils.ResourceSyncAborted, err)
+ c.recordStatus(au,
utils.ResourceSyncAborted, err, metav1.ConditionFalse, au.GetGeneration())
return err
}
- err =
update(apisixv1.ComposeUpstreamName(namespace, name, subset.Name, port.Port,
types.ResolveGranularity.Service))
+ err = c.updateUpstream(ctx,
apisixv1.ComposeUpstreamName(namespace, name, subset.Name, port.Port,
types.ResolveGranularity.Service), &cfg)
if err != nil {
+ c.RecordEvent(au,
corev1.EventTypeWarning, utils.ResourceSyncAborted, err)
+ c.recordStatus(au,
utils.ResourceSyncAborted, err, metav1.ConditionFalse, au.GetGeneration())
return err
}
}
@@ -404,13 +370,56 @@ func (c *apisixUpstreamController) sync(ctx
context.Context, ev *types.Event) er
return err
}
-func (c *apisixUpstreamController) updateExternalNodes(ctx context.Context, au
*configv2.ApisixUpstream, old *configv2.ApisixUpstream, newUps
*apisixv1.Upstream) error {
+func (c *apisixUpstreamController) updateUpstream(ctx context.Context, upsName
string, cfg *configv2.ApisixUpstreamConfig) error {
+ // TODO: multi cluster
clusterName := c.Config.APISIX.DefaultClusterName
- // TODO: if old is not nil, diff the external nodes change first
+ ups, err := c.APISIX.Cluster(clusterName).Upstream().Get(ctx, upsName)
+ if err != nil {
+ if err == apisixcache.ErrNotFound {
+ return nil
+ }
+ log.Errorf("failed to get upstream %s: %s", upsName, err)
+ return err
+ }
+ var newUps *apisixv1.Upstream
+ if cfg != nil {
+ newUps, err = c.translator.TranslateUpstreamConfigV2(cfg)
+ if err != nil {
+ log.Errorw("ApisixUpstream conversion cannot be
completed, or the format is incorrect",
+ zap.String("ApisixUpstream name", upsName),
+ zap.Error(err),
+ )
+ return err
+ }
+ } else {
+ newUps = apisixv1.NewDefaultUpstream()
+ }
+
+ newUps.Metadata = ups.Metadata
+ newUps.Nodes = ups.Nodes
+ log.Debugw("updating upstream since ApisixUpstream changed",
+ zap.Any("upstream", newUps),
+ zap.String("ApisixUpstream name", upsName),
+ )
+ if _, err := c.APISIX.Cluster(clusterName).Upstream().Update(ctx,
newUps); err != nil {
+ log.Errorw("failed to update upstream",
+ zap.Error(err),
+ zap.Any("upstream", newUps),
+ zap.String("ApisixUpstream name", upsName),
+ zap.String("cluster", clusterName),
+ )
+ return err
+ }
+ return nil
+}
- upsName := apisixv1.ComposeExternalUpstreamName(au.Namespace, au.Name)
+func (c *apisixUpstreamController) updateExternalNodes(ctx context.Context, au
*configv2.ApisixUpstream, old *configv2.ApisixUpstream, newUps
*apisixv1.Upstream, ns, name string) error {
+ clusterName := c.Config.APISIX.DefaultClusterName
+
+ // TODO: if old is not nil, diff the external nodes change first
+ upsName := apisixv1.ComposeExternalUpstreamName(ns, name)
ups, err := c.APISIX.Cluster(clusterName).Upstream().Get(ctx, upsName)
if err != nil {
if err != apisixcache.ErrNotFound {
@@ -786,7 +795,7 @@ func (c *apisixUpstreamController) handleSvcChange(ctx
context.Context, key stri
if err != nil {
return err
}
- err = c.updateExternalNodes(ctx, au.V2(), nil, nil)
+ err = c.updateExternalNodes(ctx, au.V2(), nil, nil, ns, name)
if err != nil {
return err
}
diff --git a/pkg/providers/apisix/translation/apisix_route.go
b/pkg/providers/apisix/translation/apisix_route.go
index f49fab4d..2d1d032b 100644
--- a/pkg/providers/apisix/translation/apisix_route.go
+++ b/pkg/providers/apisix/translation/apisix_route.go
@@ -257,7 +257,7 @@ func (t *translator) translateHTTPRouteV2(ctx
*translation.TranslateContext, ar
}
}
- // add KeyAuth and basicAuth plugin
+ // add Authentication plugins
if part.Authentication.Enable {
switch part.Authentication.Type {
case "keyAuth":
diff --git a/pkg/providers/apisix/translation/apisix_upstream.go
b/pkg/providers/apisix/translation/apisix_upstream.go
index e91dcfad..ee4eb89f 100644
--- a/pkg/providers/apisix/translation/apisix_upstream.go
+++ b/pkg/providers/apisix/translation/apisix_upstream.go
@@ -134,9 +134,9 @@ func (t *translator)
translateExternalApisixUpstream(namespace, upstream string)
}
au := multiVersioned.V2()
- if len(au.Spec.ExternalNodes) == 0 {
+ if len(au.Spec.ExternalNodes) == 0 && au.Spec.Discovery == nil {
// should do further resolve
- return nil, fmt.Errorf("%s/%s has empty ExternalNodes",
namespace, upstream)
+ return nil, fmt.Errorf("%s/%s has empty ExternalNodes or
Discovery configuration", namespace, upstream)
}
ups, err := t.TranslateUpstreamConfigV2(&au.Spec.ApisixUpstreamConfig)
@@ -146,12 +146,16 @@ func (t *translator)
translateExternalApisixUpstream(namespace, upstream string)
ups.Name = apisixv1.ComposeExternalUpstreamName(namespace, upstream)
ups.ID = id.GenID(ups.Name)
- externalNodes, err := t.TranslateApisixUpstreamExternalNodes(au)
- if err != nil {
- return nil, err
- }
+ // APISIX does not allow discovery_type and nodes to exist at the same
time.
+ //
https://github.com/apache/apisix/blob/01b4b49eb2ba642b337f7a1fbe1894a77942910b/apisix/schema_def.lua#L501-L504
+ if len(au.Spec.ExternalNodes) != 0 {
+ externalNodes, err := t.TranslateApisixUpstreamExternalNodes(au)
+ if err != nil {
+ return nil, err
+ }
- ups.Nodes = append(ups.Nodes, externalNodes...)
+ ups.Nodes = append(ups.Nodes, externalNodes...)
+ }
return ups, nil
}
diff --git a/pkg/providers/k8s/endpoint/endpointslice.go
b/pkg/providers/k8s/endpoint/endpointslice.go
index 10603d55..b920f6bf 100644
--- a/pkg/providers/k8s/endpoint/endpointslice.go
+++ b/pkg/providers/k8s/endpoint/endpointslice.go
@@ -239,7 +239,7 @@ func (c *endpointSliceController) onDelete(obj interface{})
{
// controller.
return
}
- log.Debugw("endpoints delete event arrived",
+ log.Debugw("endpointSlice delete event arrived",
zap.Any("object-key", key),
)
c.workqueue.Add(&types.Event{
diff --git a/pkg/providers/translation/apisix_upstream.go
b/pkg/providers/translation/apisix_upstream.go
index 9a57b566..1e708e1d 100644
--- a/pkg/providers/translation/apisix_upstream.go
+++ b/pkg/providers/translation/apisix_upstream.go
@@ -59,6 +59,9 @@ func (t *translator) TranslateUpstreamConfigV2(au
*configv2.ApisixUpstreamConfig
if err := t.translateClientTLSV2(au.TLSSecret, ups); err != nil {
return nil, err
}
+ if err := t.translateUpstreamDiscovery(au.Discovery, ups); err != nil {
+ return nil, err
+ }
return ups, nil
}
@@ -425,6 +428,17 @@ func (t *translator)
translateUpstreamRetriesAndTimeoutV2(retries *int, timeout
return nil
}
+func (t *translator) translateUpstreamDiscovery(discovery *configv2.Discovery,
ups *apisixv1.Upstream) error {
+ if discovery == nil {
+ return nil
+ }
+ ups.ServiceName = discovery.ServiceName
+ ups.DiscoveryType = discovery.Type
+ ups.DiscoveryArgs = discovery.Args
+
+ return nil
+}
+
func (t *translator) translateUpstreamLoadBalancerV2(lb
*configv2.LoadBalancer, ups *apisixv1.Upstream) error {
if lb == nil || lb.Type == "" {
ups.Type = apisixv1.LbRoundRobin
diff --git a/pkg/providers/types/types.go b/pkg/providers/types/types.go
index 5e8c852f..5ed99c0d 100644
--- a/pkg/providers/types/types.go
+++ b/pkg/providers/types/types.go
@@ -194,6 +194,10 @@ func (c *Common) SyncUpstreamNodesChangeToCluster(ctx
context.Context, cluster a
}
}
+ // Since APISIX's Upstream can support two modes:
+ // * Nodes
+ // * Service discovery
+ // When this logic is executed, the Nodes pattern is used.
upstream.Nodes = nodes
log.Debugw("upstream binds new nodes",
diff --git a/pkg/types/apisix/v1/types.go b/pkg/types/apisix/v1/types.go
index f57758b2..36caa318 100644
--- a/pkg/types/apisix/v1/types.go
+++ b/pkg/types/apisix/v1/types.go
@@ -197,6 +197,11 @@ type Upstream struct {
Retries *int `json:"retries,omitempty"
yaml:"retries,omitempty"`
Timeout *UpstreamTimeout `json:"timeout,omitempty"
yaml:"timeout,omitempty"`
TLS *ClientTLS `json:"tls,omitempty" yaml:"tls,omitempty"`
+
+ // for Service Discovery
+ ServiceName string `json:"service_name,omitempty"
yaml:"service_name,omitempty"`
+ DiscoveryType string `json:"discovery_type,omitempty"
yaml:"discovery_type,omitempty"`
+ DiscoveryArgs map[string]string `json:"discovery_args,omitempty"
yaml:"discovery_args,omitempty"`
}
// ClientTLS is tls cert and key use in mTLS
@@ -246,6 +251,83 @@ func (n *UpstreamNodes) UnmarshalJSON(p []byte) error {
return nil
}
+// MarshalJSON is used to implement custom json.MarshalJSON
+func (up Upstream) MarshalJSON() ([]byte, error) {
+
+ if up.DiscoveryType != "" {
+ return json.Marshal(&struct {
+ Metadata `json:",inline" yaml:",inline"`
+
+ Type string `json:"type,omitempty"
yaml:"type,omitempty"`
+ HashOn string `json:"hash_on,omitempty"
yaml:"hash_on,omitempty"`
+ Key string `json:"key,omitempty"
yaml:"key,omitempty"`
+ Checks *UpstreamHealthCheck `json:"checks,omitempty"
yaml:"checks,omitempty"`
+ //Nodes UpstreamNodes `json:"nodes"
yaml:"nodes"`
+ Scheme string `json:"scheme,omitempty"
yaml:"scheme,omitempty"`
+ Retries *int `json:"retries,omitempty"
yaml:"retries,omitempty"`
+ Timeout *UpstreamTimeout `json:"timeout,omitempty"
yaml:"timeout,omitempty"`
+ TLS *ClientTLS `json:"tls,omitempty"
yaml:"tls,omitempty"`
+
+ // for Service Discovery
+ ServiceName string
`json:"service_name,omitempty" yaml:"service_name,omitempty"`
+ DiscoveryType string
`json:"discovery_type,omitempty" yaml:"discovery_type,omitempty"`
+ DiscoveryArgs map[string]string
`json:"discovery_args,omitempty" yaml:"discovery_args,omitempty"`
+ }{
+ Metadata: up.Metadata,
+
+ Type: up.Type,
+ HashOn: up.HashOn,
+ Key: up.Key,
+ Checks: up.Checks,
+ //Nodes: up.Nodes,
+ Scheme: up.Scheme,
+ Retries: up.Retries,
+ Timeout: up.Timeout,
+ TLS: up.TLS,
+
+ ServiceName: up.ServiceName,
+ DiscoveryType: up.DiscoveryType,
+ DiscoveryArgs: up.DiscoveryArgs,
+ })
+ } else {
+ return json.Marshal(&struct {
+ Metadata `json:",inline" yaml:",inline"`
+
+ Type string `json:"type,omitempty"
yaml:"type,omitempty"`
+ HashOn string `json:"hash_on,omitempty"
yaml:"hash_on,omitempty"`
+ Key string `json:"key,omitempty"
yaml:"key,omitempty"`
+ Checks *UpstreamHealthCheck `json:"checks,omitempty"
yaml:"checks,omitempty"`
+ Nodes UpstreamNodes `json:"nodes" yaml:"nodes"`
+ Scheme string `json:"scheme,omitempty"
yaml:"scheme,omitempty"`
+ Retries *int `json:"retries,omitempty"
yaml:"retries,omitempty"`
+ Timeout *UpstreamTimeout `json:"timeout,omitempty"
yaml:"timeout,omitempty"`
+ TLS *ClientTLS `json:"tls,omitempty"
yaml:"tls,omitempty"`
+
+ // for Service Discovery
+ //ServiceName string
`json:"service_name,omitempty" yaml:"service_name,omitempty"`
+ //DiscoveryType string
`json:"discovery_type,omitempty" yaml:"discovery_type,omitempty"`
+ //DiscoveryArgs map[string]string
`json:"discovery_args,omitempty" yaml:"discovery_args,omitempty"`
+ }{
+ Metadata: up.Metadata,
+
+ Type: up.Type,
+ HashOn: up.HashOn,
+ Key: up.Key,
+ Checks: up.Checks,
+ Nodes: up.Nodes,
+ Scheme: up.Scheme,
+ Retries: up.Retries,
+ Timeout: up.Timeout,
+ TLS: up.TLS,
+
+ //ServiceName: up.ServiceName,
+ //DiscoveryType: up.DiscoveryType,
+ //DiscoveryArgs: up.DiscoveryArgs,
+ })
+ }
+
+}
+
func mapKV2Node(key string, val float64) (*UpstreamNode, error) {
hp := strings.Split(key, ":")
host := hp[0]
diff --git a/pkg/types/apisix/v1/zz_generated.deepcopy.go
b/pkg/types/apisix/v1/zz_generated.deepcopy.go
index f09f3097..f0981722 100644
--- a/pkg/types/apisix/v1/zz_generated.deepcopy.go
+++ b/pkg/types/apisix/v1/zz_generated.deepcopy.go
@@ -606,6 +606,13 @@ func (in *Upstream) DeepCopyInto(out *Upstream) {
*out = new(ClientTLS)
**out = **in
}
+ if in.DiscoveryArgs != nil {
+ in, out := &in.DiscoveryArgs, &out.DiscoveryArgs
+ *out = make(map[string]string, len(*in))
+ for key, val := range *in {
+ (*out)[key] = val
+ }
+ }
return
}
diff --git a/samples/deploy/crd/v1/ApisixUpstream.yaml
b/samples/deploy/crd/v1/ApisixUpstream.yaml
index 27d6ef83..337cff2a 100644
--- a/samples/deploy/crd/v1/ApisixUpstream.yaml
+++ b/samples/deploy/crd/v1/ApisixUpstream.yaml
@@ -413,6 +413,17 @@ spec:
spec:
type: object
properties:
+ discovery:
+ description: Discovery is used to configure service
discovery for upstream
+ type: object
+ properties:
+ serviceName:
+ type: string
+ type:
+ type: string
+ args:
+ type: object
+ x-kubernetes-preserve-unknown-fields: true
externalNodes:
description: ExternalNodes contains external nodes the
Upstream should use If this field is set, the upstream will use these nodes
directly without any further resolves
type: array
diff --git a/test/e2e/suite-features/external-sd.go
b/test/e2e/suite-features/external-sd.go
new file mode 100644
index 00000000..b93ee531
--- /dev/null
+++ b/test/e2e/suite-features/external-sd.go
@@ -0,0 +1,286 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package features
+
+import (
+ "fmt"
+ "net/http"
+ "os"
+ "time"
+
+ "github.com/onsi/ginkgo/v2"
+ "github.com/stretchr/testify/assert"
+
+ "github.com/apache/apisix-ingress-controller/pkg/config"
+ "github.com/apache/apisix-ingress-controller/test/e2e/scaffold"
+)
+
+var _ = ginkgo.Describe("suite-features: external service discovery", func() {
+
+ PhaseCreateApisixRoute := func(s *scaffold.Scaffold, name, upstream
string) {
+ ar := fmt.Sprintf(`
+apiVersion: apisix.apache.org/v2
+kind: ApisixRoute
+metadata:
+ name: %s
+spec:
+ http:
+ - name: rule1
+ match:
+ hosts:
+ - httpbin.org
+ paths:
+ - /*
+ exprs:
+ - subject:
+ scope: Header
+ name: X-Foo
+ op: Equal
+ value: bar
+ upstreams:
+ - name: %s
+`, name, upstream)
+ assert.Nil(ginkgo.GinkgoT(),
s.CreateVersionedApisixResource(ar))
+ }
+
+ PhaseCreateApisixUpstream := func(s *scaffold.Scaffold, name,
discoveryType, serviceName string) {
+ au := fmt.Sprintf(`
+apiVersion: apisix.apache.org/v2
+kind: ApisixUpstream
+metadata:
+ name: %s
+spec:
+ discovery:
+ type: %s
+ serviceName: %s
+`, name, discoveryType, fmt.Sprintf("%s.%s.svc.cluster.local", serviceName,
s.Namespace()))
+ assert.Nil(ginkgo.GinkgoT(),
s.CreateVersionedApisixResource(au))
+ }
+
+ PhaseValidateNoUpstreams := func(s *scaffold.Scaffold) {
+ ups, err := s.ListApisixUpstreams()
+ assert.Nil(ginkgo.GinkgoT(), err)
+ assert.Len(ginkgo.GinkgoT(), ups, 0, "upstream count")
+ }
+
+ PhaseValidateNoRoutes := func(s *scaffold.Scaffold) {
+ routes, err := s.ListApisixRoutes()
+ assert.Nil(ginkgo.GinkgoT(), err)
+ assert.Len(ginkgo.GinkgoT(), routes, 0, "route count")
+ }
+
+ PhaseValidateFirstUpstream := func(s *scaffold.Scaffold, length int,
serviceName, discoveryType string) string {
+ ups, err := s.ListApisixUpstreams()
+ assert.Nil(ginkgo.GinkgoT(), err)
+ assert.Len(ginkgo.GinkgoT(), ups, length, "upstream count")
+ upstream := ups[0]
+ assert.Equal(ginkgo.GinkgoT(), serviceName,
upstream.ServiceName)
+ assert.Equal(ginkgo.GinkgoT(), discoveryType,
upstream.DiscoveryType)
+
+ return upstream.ID
+ }
+
+ PhaseValidateRouteAccess := func(s *scaffold.Scaffold, upstreamId
string) {
+ routes, err := s.ListApisixRoutes()
+ assert.Nil(ginkgo.GinkgoT(), err)
+ assert.Len(ginkgo.GinkgoT(), routes, 1, "route count")
+ assert.Equal(ginkgo.GinkgoT(), upstreamId, routes[0].UpstreamId)
+
+ _ = s.NewAPISIXClient().GET("/ip").
+ WithHeader("Host", "httpbin.org").
+ WithHeader("X-Foo", "bar").
+ Expect().
+ Status(http.StatusOK)
+ }
+
+ //PhaseValidateRouteAccessCode := func(s *scaffold.Scaffold, upstreamId
string, code int) {
+ //routes, err := s.ListApisixRoutes()
+ //assert.Nil(ginkgo.GinkgoT(), err)
+ //assert.Len(ginkgo.GinkgoT(), routes, 1, "route count")
+ //assert.Equal(ginkgo.GinkgoT(), upstreamId, routes[0].UpstreamId)
+
+ //_ = s.NewAPISIXClient().GET("/ip").
+ //WithHeader("Host", "httpbin.org").
+ //WithHeader("X-Foo", "bar").
+ //Expect().
+ //Status(code)
+ //}
+
+ PhaseCreateHttpbin := func(s *scaffold.Scaffold, name string) string {
+ _httpbinDeploymentTemplate := fmt.Sprintf(`
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ name: %s
+spec:
+ replicas: 1
+ selector:
+ matchLabels:
+ app: %s
+ strategy:
+ rollingUpdate:
+ maxSurge: 50%%
+ maxUnavailable: 1
+ type: RollingUpdate
+ template:
+ metadata:
+ labels:
+ app: %s
+ spec:
+ terminationGracePeriodSeconds: 0
+ containers:
+ - livenessProbe:
+ failureThreshold: 3
+ initialDelaySeconds: 2
+ periodSeconds: 5
+ successThreshold: 1
+ tcpSocket:
+ port: 80
+ timeoutSeconds: 2
+ readinessProbe:
+ failureThreshold: 3
+ initialDelaySeconds: 2
+ periodSeconds: 5
+ successThreshold: 1
+ tcpSocket:
+ port: 80
+ timeoutSeconds: 2
+ image: "localhost:5000/kennethreitz/httpbin:dev"
+ imagePullPolicy: IfNotPresent
+ name: httpbin
+ ports:
+ - containerPort: 80
+ name: "http"
+ protocol: "TCP"
+`, name, name, name)
+ _httpService := fmt.Sprintf(`
+apiVersion: v1
+kind: Service
+metadata:
+ name: %s
+spec:
+ selector:
+ app: %s
+ ports:
+ - name: http
+ port: 80
+ protocol: TCP
+ targetPort: 80
+ type: ClusterIP
+`, name, name)
+
+ err :=
s.CreateResourceFromString(s.FormatRegistry(_httpbinDeploymentTemplate))
+ assert.Nil(ginkgo.GinkgoT(), err, "create temp httpbin
deployment")
+ assert.Nil(ginkgo.GinkgoT(),
s.CreateResourceFromString(_httpService), "create temp httpbin service")
+
+ return fmt.Sprintf("httpbin-temp.%s.svc.cluster.local",
s.Namespace())
+ }
+
+ // Cases:
+ // --- Basic Function ---
+ // 1. ApisixRoute refers to ApisixUpstream, ApisixUpstream refers to
service discovery
+ // 2. ApisixRoute refers to ApisixUpstream and Backends, ApisixUpstream
refers to service discovery
+ // --- Update Cases ---
+ // o 1. ApisixRoute refers to ApisixUpstream, but the ApisixUpstream is
created later
+ // --- Delete Cases ---
+ // 1. ApisixRoute is deleted, the generated resources should be removed
+
+ opts := &scaffold.Options{
+ Name: "default",
+ IngressAPISIXReplicas: 1,
+ ApisixResourceVersion: config.ApisixV2,
+ }
+
+ adminVersion := os.Getenv("APISIX_ADMIN_API_VERSION")
+ if adminVersion == "v3" {
+ opts.APISIXConfigPath =
"testdata/apisix-gw-config-v3-with-sd.yaml"
+ } else {
+ // fallback to v2
+ opts.APISIXConfigPath = "testdata/apisix-gw-config-with-sd.yaml"
+ }
+
+ s := scaffold.NewScaffold(opts)
+
+ ginkgo.Describe("basic function: ", func() {
+ ginkgo.It("should be able to access through service discovery",
func() {
+ // -- Data preparation --
+ fqdn := PhaseCreateHttpbin(s, "httpbin-temp")
+ // After creating a Service, a record will be added in
DNS.
+ // We use it for service discovery
+ PhaseCreateApisixUpstream(s, "httpbin-upstream", "dns",
"httpbin-temp")
+ PhaseCreateApisixRoute(s, "httpbin-route",
"httpbin-upstream")
+
+ // -- validation --
+ upstreamId := PhaseValidateFirstUpstream(s, 1, fqdn,
"dns")
+ PhaseValidateRouteAccess(s, upstreamId)
+ })
+ })
+
+ ginkgo.Describe("update function: ", func() {
+ ginkgo.It("should be able to create the ApisixUpstream later",
func() {
+ // -- Data preparation --
+ fqdn := PhaseCreateHttpbin(s, "httpbin-temp")
+ PhaseCreateApisixRoute(s, "httpbin-route",
"httpbin-upstream")
+ PhaseValidateNoUpstreams(s)
+
+ // -- Data Update --
+ PhaseCreateApisixUpstream(s, "httpbin-upstream", "dns",
"httpbin-temp")
+
+ // -- validation --
+ upstreamId := PhaseValidateFirstUpstream(s, 1, fqdn,
"dns")
+ PhaseValidateRouteAccess(s, upstreamId)
+ })
+
+ ginkgo.It("should be able to create the target service later",
func() {
+ // -- Data preparation --
+ PhaseCreateApisixRoute(s, "httpbin-route",
"httpbin-upstream")
+ PhaseValidateNoUpstreams(s)
+ PhaseCreateApisixUpstream(s, "httpbin-upstream", "dns",
"httpbin-temp")
+
+ // -- Data Update --
+ fqdn := PhaseCreateHttpbin(s, "httpbin-temp")
+
+ // -- validation --
+ upstreamId := PhaseValidateFirstUpstream(s, 1, fqdn,
"dns")
+ PhaseValidateRouteAccess(s, upstreamId)
+ })
+ })
+
+ ginkgo.Describe("delete function: ", func() {
+ ginkgo.It("should be able to delete resources", func() {
+ // -- Data preparation --
+ fqdn := PhaseCreateHttpbin(s, "httpbin-temp")
+ PhaseCreateApisixUpstream(s, "httpbin-upstream", "dns",
"httpbin-temp")
+ PhaseCreateApisixRoute(s, "httpbin-route",
"httpbin-upstream")
+
+ // -- validation --
+ upstreamId := PhaseValidateFirstUpstream(s, 1, fqdn,
"dns")
+ PhaseValidateRouteAccess(s, upstreamId)
+
+ // -- delete --
+ assert.Nil(ginkgo.GinkgoT(), s.DeleteResource("ar",
"httpbin-route"), "delete route")
+ assert.Nil(ginkgo.GinkgoT(), s.DeleteResource("au",
"httpbin-upstream"), "delete upstream")
+ time.Sleep(time.Second * 15)
+
+ // -- validate --
+ PhaseValidateNoRoutes(s)
+ PhaseValidateNoUpstreams(s)
+ })
+ })
+
+})
diff --git a/test/e2e/testdata/apisix-gw-config-v3-with-sd.yaml
b/test/e2e/testdata/apisix-gw-config-v3-with-sd.yaml
new file mode 100644
index 00000000..941fc206
--- /dev/null
+++ b/test/e2e/testdata/apisix-gw-config-v3-with-sd.yaml
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# PLEASE DO NOT UPDATE THIS FILE!
+# If you want to set the specified configuration value, you can set the new
+# value in the conf/config.yaml file.
+#
+
+deployment:
+ admin:
+ allow_admin:
+ - 127.0.0.0/24
+ - 0.0.0.0/0
+ admin_listen:
+ ip: 0.0.0.0
+ port: 9180
+ etcd:
+ host:
+ - "http://{{ .EtcdServiceFQDN }}:2379"
+ prefix: "/apisix"
+ timeout: 30
+
+apisix:
+ enable_control: true
+ enable_reuseport: true
+
+ stream_proxy:
+ only: false
+ tcp:
+ - 9100
+ - addr: 9110
+ tls: true
+ udp:
+ - 9200
+
+plugin_attr:
+ prometheus:
+ enable_export_server: false
+
+discovery:
+ dns:
+ servers:
+ - "10.96.0.10:53" # use the real address of your dns server.
+ # currently we use KIND as the standard test
environment, so here we can hard-code the default DNS address first.
+ # TODO: can be modified to fill dynamically
diff --git a/test/e2e/testdata/apisix-gw-config-with-sd.yaml
b/test/e2e/testdata/apisix-gw-config-with-sd.yaml
new file mode 100644
index 00000000..8d1ef5d0
--- /dev/null
+++ b/test/e2e/testdata/apisix-gw-config-with-sd.yaml
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# PLEASE DO NOT UPDATE THIS FILE!
+# If you want to set the specified configuration value, you can set the new
+# value in the conf/config.yaml file.
+#
+
+apisix:
+ enable_control: true
+ enable_reuseport: true # Enable nginx SO_REUSEPORT switch if set to
true.
+ allow_admin:
+ - 127.0.0.0/24
+ - 0.0.0.0/0
+ port_admin: 9180
+ stream_proxy: # TCP/UDP proxy
+ only: false
+ tcp: # TCP proxy port list
+ - 9100
+ - addr: 9110
+ tls: true
+ udp:
+ - 9200
+etcd:
+ host: # it's possible to define multiple etcd
hosts addresses of the same etcd cluster.
+ - "http://{{ .EtcdServiceFQDN }}:2379" # multiple etcd address
+ prefix: "/apisix" # apisix configurations prefix
+ timeout: 30 # 30 seconds
+plugin_attr:
+ prometheus:
+ enable_export_server: false
+
+discovery:
+ dns:
+ servers:
+ - "10.96.0.10:53" # use the real address of your dns server.
+ # currently we use KIND as the standard test
environment, so here we can hard-code the default DNS address first.
+ # TODO: can be modified to fill dynamically