This is an automated email from the ASF dual-hosted git repository.
ashishtiwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git
The following commit(s) were added to refs/heads/master by this push:
new 68664908 feat(gateway-api): add support for UDPRoute (#2578)
68664908 is described below
commit 686649083da6a2cd7f063485ccbe6190a735be1c
Author: Ashish Tiwari <[email protected]>
AuthorDate: Mon Sep 29 10:36:01 2025 +0530
feat(gateway-api): add support for UDPRoute (#2578)
---
api/adc/types.go | 14 +-
config/rbac/role.yaml | 4 +-
docs/en/latest/concepts/gateway-api.md | 2 +-
internal/adc/translator/apisixroute.go | 4 +-
internal/adc/translator/tcproute.go | 4 +-
.../adc/translator/{tcproute.go => udproute.go} | 23 +-
internal/controller/indexer/indexer.go | 54 +++
internal/controller/udproute_controller.go | 505 +++++++++++++++++++++
internal/controller/utils.go | 9 +-
internal/manager/controllers.go | 11 +
internal/provider/apisix/provider.go | 8 +-
internal/provider/apisix/status.go | 36 ++
internal/types/k8s.go | 5 +
test/e2e/framework/manifests/ingress.yaml | 2 +
test/e2e/gatewayapi/udproute.go | 114 +++++
15 files changed, 761 insertions(+), 34 deletions(-)
diff --git a/api/adc/types.go b/api/adc/types.go
index 6b2bec95..48abd10f 100644
--- a/api/adc/types.go
+++ b/api/adc/types.go
@@ -492,10 +492,13 @@ func ComposeRouteName(namespace, name string, rule
string) string {
// ComposeStreamRouteName uses namespace, name and rule name to compose
// the stream_route name.
-func ComposeStreamRouteName(namespace, name string, rule string) string {
+func ComposeStreamRouteName(namespace, name string, rule string, typ string)
string {
+ if typ == "" {
+ typ = "TCP"
+ }
// FIXME Use sync.Pool to reuse this buffer if the upstream
// name composing code path is hot.
- p := make([]byte, 0, len(namespace)+len(name)+len(rule)+6)
+ p := make([]byte, 0, len(namespace)+len(name)+len(rule)+len(typ)+3)
buf := bytes.NewBuffer(p)
buf.WriteString(namespace)
@@ -503,7 +506,8 @@ func ComposeStreamRouteName(namespace, name string, rule
string) string {
buf.WriteString(name)
buf.WriteByte('_')
buf.WriteString(rule)
- buf.WriteString("_tcp")
+ buf.WriteByte('_')
+ buf.WriteString(typ)
return buf.String()
}
@@ -546,8 +550,8 @@ func ComposeServicesNameWithScheme(namespace, name string,
rule string, scheme s
return buf.String()
}
-func ComposeServiceNameWithStream(namespace, name string, rule string) string {
- return ComposeServicesNameWithScheme(namespace, name, rule, "stream")
+func ComposeServiceNameWithStream(namespace, name string, rule, typ string)
string {
+ return ComposeServicesNameWithScheme(namespace, name, rule, typ)
}
func ComposeConsumerName(namespace, name string) string {
diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml
index 0caeaea9..856bf7db 100644
--- a/config/rbac/role.yaml
+++ b/config/rbac/role.yaml
@@ -93,6 +93,7 @@ rules:
- httproutes/status
- referencegrants/status
- tcproutes/status
+ - udproutes/status
verbs:
- get
- update
@@ -102,8 +103,9 @@ rules:
- gateways
- grpcroutes
- httproutes
- - tcproutes
- referencegrants
+ - tcproutes
+ - udproutes
verbs:
- get
- list
diff --git a/docs/en/latest/concepts/gateway-api.md
b/docs/en/latest/concepts/gateway-api.md
index 20240f10..4621dfe9 100644
--- a/docs/en/latest/concepts/gateway-api.md
+++ b/docs/en/latest/concepts/gateway-api.md
@@ -52,7 +52,7 @@ By supporting Gateway API, the APISIX Ingress controller can
realize richer func
| ReferenceGrant | Supported | Not supported | Not
supported | v1beta1 |
| TLSRoute | Not supported | Not supported | Not
supported | v1alpha2 |
| TCPRoute | Supported | Supported | Not
supported | v1alpha2 |
-| UDPRoute | Not supported | Not supported | Not
supported | v1alpha2 |
+| UDPRoute | Supported | Supported | Not
supported | v1alpha2 |
| BackendTLSPolicy | Not supported | Not supported | Not
supported | v1alpha3 |
## Examples
diff --git a/internal/adc/translator/apisixroute.go
b/internal/adc/translator/apisixroute.go
index d73f9981..0fcf4424 100644
--- a/internal/adc/translator/apisixroute.go
+++ b/internal/adc/translator/apisixroute.go
@@ -422,14 +422,14 @@ func (t *Translator) translateStreamRule(tctx
*provider.TranslateContext, ar *ap
t.loadRoutePlugins(tctx, ar, part.Plugins, plugins)
sr := adc.NewDefaultStreamRoute()
- sr.Name = adc.ComposeStreamRouteName(ar.Namespace, ar.Name, part.Name)
+ sr.Name = adc.ComposeStreamRouteName(ar.Namespace, ar.Name, part.Name,
part.Protocol)
sr.ID = id.GenID(sr.Name)
sr.ServerPort = part.Match.IngressPort
sr.SNI = part.Match.Host
sr.Plugins = plugins
svc := adc.NewDefaultService()
- svc.Name = adc.ComposeServiceNameWithStream(ar.Namespace, ar.Name,
part.Name)
+ svc.Name = adc.ComposeServiceNameWithStream(ar.Namespace, ar.Name,
part.Name, part.Protocol)
svc.ID = id.GenID(svc.Name)
svc.StreamRoutes = append(svc.StreamRoutes, sr)
diff --git a/internal/adc/translator/tcproute.go
b/internal/adc/translator/tcproute.go
index 5c97f2dc..ffef927c 100644
--- a/internal/adc/translator/tcproute.go
+++ b/internal/adc/translator/tcproute.go
@@ -49,7 +49,7 @@ func (t *Translator) TranslateTCPRoute(tctx
*provider.TranslateContext, tcpRoute
for ruleIndex, rule := range rules {
service := adctypes.NewDefaultService()
service.Labels = labels
- service.Name =
adctypes.ComposeServiceNameWithStream(tcpRoute.Namespace, tcpRoute.Name,
fmt.Sprintf("%d", ruleIndex))
+ service.Name =
adctypes.ComposeServiceNameWithStream(tcpRoute.Namespace, tcpRoute.Name,
fmt.Sprintf("%d", ruleIndex), "TCP")
service.ID = id.GenID(service.Name)
var (
upstreams = make([]*adctypes.Upstream, 0)
@@ -151,7 +151,7 @@ func (t *Translator) TranslateTCPRoute(tctx
*provider.TranslateContext, tcpRoute
}
}
streamRoute := adctypes.NewDefaultStreamRoute()
- streamRouteName :=
adctypes.ComposeStreamRouteName(tcpRoute.Namespace, tcpRoute.Name,
fmt.Sprintf("%d", ruleIndex))
+ streamRouteName :=
adctypes.ComposeStreamRouteName(tcpRoute.Namespace, tcpRoute.Name,
fmt.Sprintf("%d", ruleIndex), "TCP")
streamRoute.Name = streamRouteName
streamRoute.ID = id.GenID(streamRouteName)
streamRoute.Labels = labels
diff --git a/internal/adc/translator/tcproute.go
b/internal/adc/translator/udproute.go
similarity index 87%
copy from internal/adc/translator/tcproute.go
copy to internal/adc/translator/udproute.go
index 5c97f2dc..983130cb 100644
--- a/internal/adc/translator/tcproute.go
+++ b/internal/adc/translator/udproute.go
@@ -31,25 +31,14 @@ import (
"github.com/apache/apisix-ingress-controller/internal/types"
)
-func newDefaultUpstreamWithoutScheme() *adctypes.Upstream {
- return &adctypes.Upstream{
- Metadata: adctypes.Metadata{
- Labels: map[string]string{
- "managed-by": "apisix-ingress-controller",
- },
- },
- Nodes: make(adctypes.UpstreamNodes, 0),
- }
-}
-
-func (t *Translator) TranslateTCPRoute(tctx *provider.TranslateContext,
tcpRoute *gatewayv1alpha2.TCPRoute) (*TranslateResult, error) {
+func (t *Translator) TranslateUDPRoute(tctx *provider.TranslateContext,
udpRoute *gatewayv1alpha2.UDPRoute) (*TranslateResult, error) {
result := &TranslateResult{}
- rules := tcpRoute.Spec.Rules
- labels := label.GenLabel(tcpRoute)
+ rules := udpRoute.Spec.Rules
+ labels := label.GenLabel(udpRoute)
for ruleIndex, rule := range rules {
service := adctypes.NewDefaultService()
service.Labels = labels
- service.Name =
adctypes.ComposeServiceNameWithStream(tcpRoute.Namespace, tcpRoute.Name,
fmt.Sprintf("%d", ruleIndex))
+ service.Name =
adctypes.ComposeServiceNameWithStream(udpRoute.Namespace, udpRoute.Name,
fmt.Sprintf("%d", ruleIndex), "UDP")
service.ID = id.GenID(service.Name)
var (
upstreams = make([]*adctypes.Upstream, 0)
@@ -57,7 +46,7 @@ func (t *Translator) TranslateTCPRoute(tctx
*provider.TranslateContext, tcpRoute
)
for _, backend := range rule.BackendRefs {
if backend.Namespace == nil {
- namespace :=
gatewayv1.Namespace(tcpRoute.Namespace)
+ namespace :=
gatewayv1.Namespace(udpRoute.Namespace)
backend.Namespace = &namespace
}
upstream := newDefaultUpstreamWithoutScheme()
@@ -151,7 +140,7 @@ func (t *Translator) TranslateTCPRoute(tctx
*provider.TranslateContext, tcpRoute
}
}
streamRoute := adctypes.NewDefaultStreamRoute()
- streamRouteName :=
adctypes.ComposeStreamRouteName(tcpRoute.Namespace, tcpRoute.Name,
fmt.Sprintf("%d", ruleIndex))
+ streamRouteName :=
adctypes.ComposeStreamRouteName(udpRoute.Namespace, udpRoute.Name,
fmt.Sprintf("%d", ruleIndex), "UDP")
streamRoute.Name = streamRouteName
streamRoute.ID = id.GenID(streamRouteName)
streamRoute.Labels = labels
diff --git a/internal/controller/indexer/indexer.go
b/internal/controller/indexer/indexer.go
index f21a5f06..ef3da206 100644
--- a/internal/controller/indexer/indexer.go
+++ b/internal/controller/indexer/indexer.go
@@ -57,6 +57,7 @@ func SetupIndexer(mgr ctrl.Manager) error {
setupGatewayIndexer,
setupHTTPRouteIndexer,
setupTCPRouteIndexer,
+ setupUDPRouteIndexer,
setupGRPCRouteIndexer,
setupIngressIndexer,
setupConsumerIndexer,
@@ -252,6 +253,28 @@ func setupTCPRouteIndexer(mgr ctrl.Manager) error {
}
return nil
}
+
+func setupUDPRouteIndexer(mgr ctrl.Manager) error {
+ if err := mgr.GetFieldIndexer().IndexField(
+ context.Background(),
+ &gatewayv1alpha2.UDPRoute{},
+ ParentRefs,
+ UDPRouteParentRefsIndexFunc,
+ ); err != nil {
+ return err
+ }
+
+ if err := mgr.GetFieldIndexer().IndexField(
+ context.Background(),
+ &gatewayv1alpha2.UDPRoute{},
+ ServiceIndexRef,
+ UDPRouteServiceIndexFunc,
+ ); err != nil {
+ return err
+ }
+ return nil
+}
+
func setupIngressClassIndexer(mgr ctrl.Manager) error {
// create IngressClass index
if err := mgr.GetFieldIndexer().IndexField(
@@ -517,6 +540,19 @@ func TCPRouteParentRefsIndexFunc(rawObj client.Object)
[]string {
return keys
}
+func UDPRouteParentRefsIndexFunc(rawObj client.Object) []string {
+ ur := rawObj.(*gatewayv1alpha2.UDPRoute)
+ keys := make([]string, 0, len(ur.Spec.ParentRefs))
+ for _, ref := range ur.Spec.ParentRefs {
+ ns := ur.GetNamespace()
+ if ref.Namespace != nil {
+ ns = string(*ref.Namespace)
+ }
+ keys = append(keys, GenIndexKey(ns, string(ref.Name)))
+ }
+ return keys
+}
+
func HTTPRouteServiceIndexFunc(rawObj client.Object) []string {
hr := rawObj.(*gatewayv1.HTTPRoute)
keys := make([]string, 0, len(hr.Spec.Rules))
@@ -553,6 +589,24 @@ func TCPPRouteServiceIndexFunc(rawObj client.Object)
[]string {
return keys
}
+func UDPRouteServiceIndexFunc(rawObj client.Object) []string {
+ ur := rawObj.(*gatewayv1alpha2.UDPRoute)
+ keys := make([]string, 0, len(ur.Spec.Rules))
+ for _, rule := range ur.Spec.Rules {
+ for _, backend := range rule.BackendRefs {
+ namespace := ur.GetNamespace()
+ if backend.Kind != nil && *backend.Kind !=
internaltypes.KindService {
+ continue
+ }
+ if backend.Namespace != nil {
+ namespace = string(*backend.Namespace)
+ }
+ keys = append(keys, GenIndexKey(namespace,
string(backend.Name)))
+ }
+ }
+ return keys
+}
+
func ApisixRouteServiceIndexFunc(cli client.Client) func(client.Object)
[]string {
return func(obj client.Object) (keys []string) {
ar := obj.(*apiv2.ApisixRoute)
diff --git a/internal/controller/udproute_controller.go
b/internal/controller/udproute_controller.go
new file mode 100644
index 00000000..88b11367
--- /dev/null
+++ b/internal/controller/udproute_controller.go
@@ -0,0 +1,505 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package controller
+
+import (
+ "cmp"
+ "context"
+ "fmt"
+
+ "github.com/go-logr/logr"
+ corev1 "k8s.io/api/core/v1"
+ discoveryv1 "k8s.io/api/discovery/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ k8stypes "k8s.io/apimachinery/pkg/types"
+ ctrl "sigs.k8s.io/controller-runtime"
+ "sigs.k8s.io/controller-runtime/pkg/builder"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+ "sigs.k8s.io/controller-runtime/pkg/event"
+ "sigs.k8s.io/controller-runtime/pkg/handler"
+ "sigs.k8s.io/controller-runtime/pkg/predicate"
+ "sigs.k8s.io/controller-runtime/pkg/reconcile"
+ gatewayv1 "sigs.k8s.io/gateway-api/apis/v1"
+ gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
+ "sigs.k8s.io/gateway-api/apis/v1beta1"
+
+ "github.com/apache/apisix-ingress-controller/api/v1alpha1"
+
"github.com/apache/apisix-ingress-controller/internal/controller/indexer"
+ "github.com/apache/apisix-ingress-controller/internal/controller/status"
+ "github.com/apache/apisix-ingress-controller/internal/manager/readiness"
+ "github.com/apache/apisix-ingress-controller/internal/provider"
+ "github.com/apache/apisix-ingress-controller/internal/types"
+ "github.com/apache/apisix-ingress-controller/internal/utils"
+)
+
+// UDPRouteReconciler reconciles a UDPRoute object.
+type UDPRouteReconciler struct { //nolint:revive
+ client.Client
+ Scheme *runtime.Scheme
+
+ Log logr.Logger
+
+ Provider provider.Provider
+
+ Updater status.Updater
+ Readier readiness.ReadinessManager
+}
+
+// SetupWithManager sets up the controller with the Manager.
+func (r *UDPRouteReconciler) SetupWithManager(mgr ctrl.Manager) error {
+
+ bdr := ctrl.NewControllerManagedBy(mgr).
+ For(&gatewayv1alpha2.UDPRoute{}).
+ WithEventFilter(predicate.GenerationChangedPredicate{}).
+ Watches(&discoveryv1.EndpointSlice{},
+
handler.EnqueueRequestsFromMapFunc(r.listUDPRoutesByServiceRef),
+ ).
+ Watches(&gatewayv1.Gateway{},
+
handler.EnqueueRequestsFromMapFunc(r.listUDPRoutesForGateway),
+ builder.WithPredicates(
+ predicate.Funcs{
+ GenericFunc: func(e event.GenericEvent)
bool {
+ return false
+ },
+ DeleteFunc: func(e event.DeleteEvent)
bool {
+ return false
+ },
+ CreateFunc: func(e event.CreateEvent)
bool {
+ return true
+ },
+ UpdateFunc: func(e event.UpdateEvent)
bool {
+ return true
+ },
+ },
+ ),
+ ).
+ Watches(&v1alpha1.BackendTrafficPolicy{},
+
handler.EnqueueRequestsFromMapFunc(r.listUDPRoutesForBackendTrafficPolicy),
+ ).
+ Watches(&v1alpha1.GatewayProxy{},
+
handler.EnqueueRequestsFromMapFunc(r.listUDPRoutesForGatewayProxy),
+ )
+
+ if GetEnableReferenceGrant() {
+ bdr.Watches(&v1beta1.ReferenceGrant{},
+
handler.EnqueueRequestsFromMapFunc(r.listUDPRoutesForReferenceGrant),
+
builder.WithPredicates(referenceGrantPredicates(KindUDPRoute)),
+ )
+ }
+
+ return bdr.Complete(r)
+}
+
+func (r *UDPRouteReconciler) listUDPRoutesForBackendTrafficPolicy(ctx
context.Context, obj client.Object) []reconcile.Request {
+ policy, ok := obj.(*v1alpha1.BackendTrafficPolicy)
+ if !ok {
+ r.Log.Error(fmt.Errorf("unexpected object type"), "failed to
convert object to BackendTrafficPolicy")
+ return nil
+ }
+
+ udprouteList := []gatewayv1alpha2.UDPRoute{}
+ for _, targetRef := range policy.Spec.TargetRefs {
+ service := &corev1.Service{}
+ if err := r.Get(ctx, client.ObjectKey{
+ Namespace: policy.Namespace,
+ Name: string(targetRef.Name),
+ }, service); err != nil {
+ if client.IgnoreNotFound(err) != nil {
+ r.Log.Error(err, "failed to get service",
"namespace", policy.Namespace, "name", targetRef.Name)
+ }
+ continue
+ }
+ udprList := &gatewayv1alpha2.UDPRouteList{}
+ if err := r.List(ctx, udprList, client.MatchingFields{
+ indexer.ServiceIndexRef:
indexer.GenIndexKey(policy.Namespace, string(targetRef.Name)),
+ }); err != nil {
+ r.Log.Error(err, "failed to list udproutes by service
reference", "service", targetRef.Name)
+ return nil
+ }
+ udprouteList = append(udprouteList, udprList.Items...)
+ }
+ var namespacedNameMap = make(map[k8stypes.NamespacedName]struct{})
+ requests := make([]reconcile.Request, 0, len(udprouteList))
+ for _, tr := range udprouteList {
+ key := k8stypes.NamespacedName{
+ Namespace: tr.Namespace,
+ Name: tr.Name,
+ }
+ if _, ok := namespacedNameMap[key]; !ok {
+ namespacedNameMap[key] = struct{}{}
+ requests = append(requests, reconcile.Request{
+ NamespacedName: client.ObjectKey{
+ Namespace: tr.Namespace,
+ Name: tr.Name,
+ },
+ })
+ }
+ }
+ return requests
+}
+
+func (r *UDPRouteReconciler) listUDPRoutesForGateway(ctx context.Context, obj
client.Object) []reconcile.Request {
+ gateway, ok := obj.(*gatewayv1.Gateway)
+ if !ok {
+ r.Log.Error(fmt.Errorf("unexpected object type"), "failed to
convert object to Gateway")
+ }
+ udprList := &gatewayv1alpha2.UDPRouteList{}
+ if err := r.List(ctx, udprList, client.MatchingFields{
+ indexer.ParentRefs: indexer.GenIndexKey(gateway.Namespace,
gateway.Name),
+ }); err != nil {
+ r.Log.Error(err, "failed to list udproutes by gateway",
"gateway", gateway.Name)
+ return nil
+ }
+
+ requests := make([]reconcile.Request, 0, len(udprList.Items))
+ for _, tcr := range udprList.Items {
+ requests = append(requests, reconcile.Request{
+ NamespacedName: client.ObjectKey{
+ Namespace: tcr.Namespace,
+ Name: tcr.Name,
+ },
+ })
+ }
+ return requests
+}
+
+// listUDPRoutesForGatewayProxy list all UDPRoute resources that are affected
by a given GatewayProxy
+func (r *UDPRouteReconciler) listUDPRoutesForGatewayProxy(ctx context.Context,
obj client.Object) []reconcile.Request {
+ gatewayProxy, ok := obj.(*v1alpha1.GatewayProxy)
+ if !ok {
+ r.Log.Error(fmt.Errorf("unexpected object type"), "failed to
convert object to GatewayProxy")
+ return nil
+ }
+
+ namespace := gatewayProxy.GetNamespace()
+ name := gatewayProxy.GetName()
+
+ // find all gateways that reference this gateway proxy
+ gatewayList := &gatewayv1.GatewayList{}
+ if err := r.List(ctx, gatewayList, client.MatchingFields{
+ indexer.ParametersRef: indexer.GenIndexKey(namespace, name),
+ }); err != nil {
+ r.Log.Error(err, "failed to list gateways for gateway proxy",
"gatewayproxy", gatewayProxy.GetName())
+ return nil
+ }
+
+ var requests []reconcile.Request
+
+ // for each gateway, find all UDPRoute resources that reference it
+ for _, gateway := range gatewayList.Items {
+ udpRouteList := &gatewayv1alpha2.UDPRouteList{}
+ if err := r.List(ctx, udpRouteList, client.MatchingFields{
+ indexer.ParentRefs:
indexer.GenIndexKey(gateway.Namespace, gateway.Name),
+ }); err != nil {
+ r.Log.Error(err, "failed to list udproutes for
gateway", "gateway", gateway.Name)
+ continue
+ }
+
+ for _, udpRoute := range udpRouteList.Items {
+ requests = append(requests, reconcile.Request{
+ NamespacedName: client.ObjectKey{
+ Namespace: udpRoute.Namespace,
+ Name: udpRoute.Name,
+ },
+ })
+ }
+ }
+
+ return requests
+}
+
+func (r *UDPRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request)
(ctrl.Result, error) {
+ defer r.Readier.Done(&gatewayv1alpha2.UDPRoute{}, req.NamespacedName)
+ tr := new(gatewayv1alpha2.UDPRoute)
+ if err := r.Get(ctx, req.NamespacedName, tr); err != nil {
+ if client.IgnoreNotFound(err) == nil {
+ tr.Namespace = req.Namespace
+ tr.Name = req.Name
+
+ tr.TypeMeta = metav1.TypeMeta{
+ Kind: KindUDPRoute,
+ APIVersion:
gatewayv1alpha2.GroupVersion.String(),
+ }
+
+ if err := r.Provider.Delete(ctx, tr); err != nil {
+ r.Log.Error(err, "failed to delete udproute",
"udproute", tr)
+ return ctrl.Result{}, err
+ }
+ return ctrl.Result{}, nil
+ }
+ return ctrl.Result{}, err
+ }
+
+ type ResourceStatus struct {
+ status bool
+ msg string
+ }
+
+ acceptStatus := ResourceStatus{
+ status: true,
+ msg: "Route is accepted",
+ }
+
+ gateways, err := ParseRouteParentRefs(ctx, r.Client, tr,
tr.Spec.ParentRefs)
+ if err != nil {
+ return ctrl.Result{}, err
+ }
+
+ if len(gateways) == 0 {
+ return ctrl.Result{}, nil
+ }
+
+ tctx := provider.NewDefaultTranslateContext(ctx)
+
+ tctx.RouteParentRefs = tr.Spec.ParentRefs
+ rk := utils.NamespacedNameKind(tr)
+ for _, gateway := range gateways {
+ if err := ProcessGatewayProxy(r.Client, r.Log, tctx,
gateway.Gateway, rk); err != nil {
+ acceptStatus.status = false
+ acceptStatus.msg = err.Error()
+ }
+ }
+
+ var backendRefErr error
+ if err := r.processUDPRoute(tctx, tr); err != nil {
+ // When encountering a backend reference error, it should not
affect the acceptance status
+ if types.IsSomeReasonError(err,
gatewayv1.RouteReasonInvalidKind) {
+ backendRefErr = err
+ } else {
+ acceptStatus.status = false
+ acceptStatus.msg = err.Error()
+ }
+ }
+
+ // Store the backend reference error for later use.
+ // If the backend reference error is because of an invalid kind, use
this error first
+ if err := r.processUDPRouteBackendRefs(tctx, req.NamespacedName); err
!= nil && backendRefErr == nil {
+ backendRefErr = err
+ }
+
+ ProcessBackendTrafficPolicy(r.Client, r.Log, tctx)
+ tr.Status.Parents = make([]gatewayv1.RouteParentStatus, 0,
len(gateways))
+ for _, gateway := range gateways {
+ parentStatus := gatewayv1.RouteParentStatus{}
+ SetRouteParentRef(&parentStatus, gateway.Gateway.Name,
gateway.Gateway.Namespace)
+ for _, condition := range gateway.Conditions {
+ parentStatus.Conditions =
MergeCondition(parentStatus.Conditions, condition)
+ }
+ SetRouteConditionAccepted(&parentStatus, tr.GetGeneration(),
acceptStatus.status, acceptStatus.msg)
+ SetRouteConditionResolvedRefs(&parentStatus,
tr.GetGeneration(), backendRefErr)
+
+ tr.Status.Parents = append(tr.Status.Parents, parentStatus)
+ }
+
+ r.Updater.Update(status.Update{
+ NamespacedName: utils.NamespacedName(tr),
+ Resource: &gatewayv1alpha2.UDPRoute{},
+ Mutator: status.MutatorFunc(func(obj client.Object)
client.Object {
+ t, ok := obj.(*gatewayv1alpha2.UDPRoute)
+ if !ok {
+ err := fmt.Errorf("unsupported object type %T",
obj)
+ panic(err)
+ }
+ tCopy := t.DeepCopy()
+ tCopy.Status = tr.Status
+ return tCopy
+ }),
+ })
+ UpdateStatus(r.Updater, r.Log, tctx)
+ if isRouteAccepted(gateways) {
+ routeToUpdate := tr
+ if err := r.Provider.Update(ctx, tctx, routeToUpdate); err !=
nil {
+ return ctrl.Result{}, err
+ }
+ }
+ return ctrl.Result{}, nil
+}
+
+func (r *UDPRouteReconciler) processUDPRoute(tctx *provider.TranslateContext,
udpRoute *gatewayv1alpha2.UDPRoute) error {
+ var terror error
+ for _, rule := range udpRoute.Spec.Rules {
+ for _, backend := range rule.BackendRefs {
+ if backend.Kind != nil && *backend.Kind != KindService {
+ terror =
types.NewInvalidKindError(*backend.Kind)
+ continue
+ }
+ tctx.BackendRefs = append(tctx.BackendRefs,
gatewayv1.BackendRef{
+ BackendObjectReference:
gatewayv1.BackendObjectReference{
+ Name: backend.Name,
+ Namespace: cmp.Or(backend.Namespace,
(*gatewayv1.Namespace)(&udpRoute.Namespace)),
+ Port: backend.Port,
+ },
+ })
+ }
+ }
+
+ return terror
+}
+
+func (r *UDPRouteReconciler) processUDPRouteBackendRefs(tctx
*provider.TranslateContext, trNN k8stypes.NamespacedName) error {
+ var terr error
+ for _, backend := range tctx.BackendRefs {
+ targetNN := k8stypes.NamespacedName{
+ Namespace: trNN.Namespace,
+ Name: string(backend.Name),
+ }
+ if backend.Namespace != nil {
+ targetNN.Namespace = string(*backend.Namespace)
+ }
+
+ if backend.Kind != nil && *backend.Kind != KindService {
+ terr = types.NewInvalidKindError(*backend.Kind)
+ continue
+ }
+
+ if backend.Port == nil {
+ terr = fmt.Errorf("port is required")
+ continue
+ }
+
+ var service corev1.Service
+ if err := r.Get(tctx, targetNN, &service); err != nil {
+ terr = err
+ if client.IgnoreNotFound(err) == nil {
+ terr = types.ReasonError{
+ Reason:
string(gatewayv1.RouteReasonBackendNotFound),
+ Message: fmt.Sprintf("Service %s not
found", targetNN),
+ }
+ }
+ continue
+ }
+
+ // if cross namespaces between UDPRoute and referenced Service,
check ReferenceGrant
+ if trNN.Namespace != targetNN.Namespace {
+ if permitted := checkReferenceGrant(tctx,
+ r.Client,
+ v1beta1.ReferenceGrantFrom{
+ Group: gatewayv1.GroupName,
+ Kind: KindUDPRoute,
+ Namespace:
v1beta1.Namespace(trNN.Namespace),
+ },
+ gatewayv1.ObjectReference{
+ Group: corev1.GroupName,
+ Kind: KindService,
+ Name:
gatewayv1.ObjectName(targetNN.Name),
+ Namespace:
(*gatewayv1.Namespace)(&targetNN.Namespace),
+ },
+ ); !permitted {
+ terr = types.ReasonError{
+ Reason:
string(v1beta1.RouteReasonRefNotPermitted),
+ Message: fmt.Sprintf("%s is in a
different namespace than the UDPRoute %s and no ReferenceGrant allowing
reference is configured", targetNN, trNN),
+ }
+ continue
+ }
+ }
+
+ if service.Spec.Type == corev1.ServiceTypeExternalName {
+ tctx.Services[targetNN] = &service
+ continue
+ }
+
+ portExists := false
+ for _, port := range service.Spec.Ports {
+ if port.Port == int32(*backend.Port) {
+ portExists = true
+ break
+ }
+ }
+ if !portExists {
+ terr = fmt.Errorf("port %d not found in service %s",
*backend.Port, targetNN.Name)
+ continue
+ }
+ tctx.Services[targetNN] = &service
+
+ endpointSliceList := new(discoveryv1.EndpointSliceList)
+ if err := r.List(tctx, endpointSliceList,
+ client.InNamespace(targetNN.Namespace),
+ client.MatchingLabels{
+ discoveryv1.LabelServiceName: targetNN.Name,
+ },
+ ); err != nil {
+ r.Log.Error(err, "failed to list endpoint slices",
"Service", targetNN)
+ terr = err
+ continue
+ }
+
+ tctx.EndpointSlices[targetNN] = endpointSliceList.Items
+ }
+ return terr
+}
+
+func (r *UDPRouteReconciler) listUDPRoutesForReferenceGrant(ctx
context.Context, obj client.Object) (requests []reconcile.Request) {
+ grant, ok := obj.(*v1beta1.ReferenceGrant)
+ if !ok {
+ r.Log.Error(fmt.Errorf("unexpected object type"), "failed to
convert object to ReferenceGrant")
+ return nil
+ }
+
+ var udpRouteList gatewayv1alpha2.UDPRouteList
+ if err := r.List(ctx, &udpRouteList); err != nil {
+ r.Log.Error(err, "failed to list udproutes for reference
ReferenceGrant", "ReferenceGrant", k8stypes.NamespacedName{Namespace:
obj.GetNamespace(), Name: obj.GetName()})
+ return nil
+ }
+
+ for _, udpRoute := range udpRouteList.Items {
+ tr := v1beta1.ReferenceGrantFrom{
+ Group: gatewayv1.GroupName,
+ Kind: KindUDPRoute,
+ Namespace: v1beta1.Namespace(udpRoute.GetNamespace()),
+ }
+ for _, from := range grant.Spec.From {
+ if from == tr {
+ requests = append(requests, reconcile.Request{
+ NamespacedName: client.ObjectKey{
+ Namespace:
udpRoute.GetNamespace(),
+ Name: udpRoute.GetName(),
+ },
+ })
+ }
+ }
+ }
+ return requests
+}
+
+func (r *UDPRouteReconciler) listUDPRoutesByServiceRef(ctx context.Context,
obj client.Object) []reconcile.Request {
+ endpointSlice, ok := obj.(*discoveryv1.EndpointSlice)
+ if !ok {
+ r.Log.Error(fmt.Errorf("unexpected object type"), "failed to
convert object to EndpointSlice")
+ return nil
+ }
+ namespace := endpointSlice.GetNamespace()
+ serviceName := endpointSlice.Labels[discoveryv1.LabelServiceName]
+
+ trList := &gatewayv1alpha2.UDPRouteList{}
+ if err := r.List(ctx, trList, client.MatchingFields{
+ indexer.ServiceIndexRef: indexer.GenIndexKey(namespace,
serviceName),
+ }); err != nil {
+ r.Log.Error(err, "failed to list udproutes by service",
"service", serviceName)
+ return nil
+ }
+ requests := make([]reconcile.Request, 0, len(trList.Items))
+ for _, tr := range trList.Items {
+ requests = append(requests, reconcile.Request{
+ NamespacedName: client.ObjectKey{
+ Namespace: tr.Namespace,
+ Name: tr.Name,
+ },
+ })
+ }
+ return requests
+}
diff --git a/internal/controller/utils.go b/internal/controller/utils.go
index afe4728b..4a0e9a65 100644
--- a/internal/controller/utils.go
+++ b/internal/controller/utils.go
@@ -62,6 +62,7 @@ const (
KindGateway = "Gateway"
KindHTTPRoute = "HTTPRoute"
KindTCPRoute = "TCPRoute"
+ KindUDPRoute = "UDPRoute"
KindGRPCRoute = "GRPCRoute"
KindGatewayClass = "GatewayClass"
KindIngress = "Ingress"
@@ -495,8 +496,8 @@ func routeHostnamesIntersectsWithListenerHostname(route
client.Object, listener
switch r := route.(type) {
case *gatewayv1.HTTPRoute:
return listenerHostnameIntersectWithRouteHostnames(listener,
r.Spec.Hostnames)
- case *gatewayv1alpha2.TCPRoute:
- return true // TCPRoute doesn't have Hostnames to match
+ case *gatewayv1alpha2.TCPRoute, *gatewayv1alpha2.UDPRoute:
+ return true // TCPRoute and UDPRoute don't have Hostnames to
match
case *gatewayv1.GRPCRoute:
return listenerHostnameIntersectWithRouteHostnames(listener,
r.Spec.Hostnames)
default:
@@ -667,6 +668,10 @@ func routeMatchesListenerType(route client.Object,
listener gatewayv1.Listener)
if listener.Protocol != gatewayv1.TCPProtocolType {
return false
}
+ case *gatewayv1alpha2.UDPRoute:
+ if listener.Protocol != gatewayv1.UDPProtocolType {
+ return false
+ }
default:
return false
}
diff --git a/internal/manager/controllers.go b/internal/manager/controllers.go
index 33ba6fc0..8420909c 100644
--- a/internal/manager/controllers.go
+++ b/internal/manager/controllers.go
@@ -84,6 +84,8 @@ import (
//
+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=httproutes/status,verbs=get;update
//
+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=tcproutes,verbs=get;list;watch
//
+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=tcproutes/status,verbs=get;update
+//
+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=udproutes,verbs=get;list;watch
+//
+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=udproutes/status,verbs=get;update
//
+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=referencegrants,verbs=get;list;watch
//
+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=referencegrants/status,verbs=get;update
//
+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=grpcroutes,verbs=get;list;watch
@@ -132,6 +134,14 @@ func setupControllers(ctx context.Context, mgr
manager.Manager, pro provider.Pro
Updater: updater,
Readier: readier,
},
+ &controller.UDPRouteReconciler{
+ Client: mgr.GetClient(),
+ Scheme: mgr.GetScheme(),
+ Log:
ctrl.LoggerFrom(ctx).WithName("controllers").WithName(types.KindUDPRoute),
+ Provider: pro,
+ Updater: updater,
+ Readier: readier,
+ },
&controller.GRPCRouteReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
@@ -222,6 +232,7 @@ func registerReadinessGVK(c client.Client, readier
readiness.ReadinessManager) {
GVKs: []schema.GroupVersionKind{
types.GvkOf(&gatewayv1.HTTPRoute{}),
types.GvkOf(&gatewayv1alpha2.TCPRoute{}),
+ types.GvkOf(&gatewayv1alpha2.UDPRoute{}),
types.GvkOf(&gatewayv1.GRPCRoute{}),
},
},
diff --git a/internal/provider/apisix/provider.go
b/internal/provider/apisix/provider.go
index 5d855267..188e6c2c 100644
--- a/internal/provider/apisix/provider.go
+++ b/internal/provider/apisix/provider.go
@@ -112,6 +112,9 @@ func (d *apisixProvider) Update(ctx context.Context, tctx
*provider.TranslateCon
case *gatewayv1alpha2.TCPRoute:
result, err = d.translator.TranslateTCPRoute(tctx, t.DeepCopy())
resourceTypes = append(resourceTypes, adctypes.TypeService)
+ case *gatewayv1alpha2.UDPRoute:
+ result, err = d.translator.TranslateUDPRoute(tctx, t.DeepCopy())
+ resourceTypes = append(resourceTypes, adctypes.TypeService)
case *gatewayv1.GRPCRoute:
result, err = d.translator.TranslateGRPCRoute(tctx,
t.DeepCopy())
resourceTypes = append(resourceTypes, adctypes.TypeService)
@@ -185,10 +188,7 @@ func (d *apisixProvider) Delete(ctx context.Context, obj
client.Object) error {
var resourceTypes []string
var labels map[string]string
switch obj.(type) {
- case *gatewayv1.HTTPRoute, *apiv2.ApisixRoute, *gatewayv1.GRPCRoute:
- resourceTypes = append(resourceTypes, adctypes.TypeService)
- labels = label.GenLabel(obj)
- case *gatewayv1alpha2.TCPRoute:
+ case *gatewayv1.HTTPRoute, *apiv2.ApisixRoute, *gatewayv1.GRPCRoute,
*gatewayv1alpha2.TCPRoute, *gatewayv1alpha2.UDPRoute:
resourceTypes = append(resourceTypes, adctypes.TypeService)
labels = label.GenLabel(obj)
case *gatewayv1.Gateway:
diff --git a/internal/provider/apisix/status.go
b/internal/provider/apisix/status.go
index 6bf2a355..3bc3dcd6 100644
--- a/internal/provider/apisix/status.go
+++ b/internal/provider/apisix/status.go
@@ -68,6 +68,7 @@ func (d *apisixProvider) handleStatusUpdate(statusUpdateMap
map[types.Namespaced
d.statusUpdateMap = statusUpdateMap
}
+//nolint:gocyclo
func (d *apisixProvider) updateStatus(nnk types.NamespacedNameKind, condition
metav1.Condition) {
switch nnk.Kind {
case types.KindApisixRoute:
@@ -145,6 +146,41 @@ func (d *apisixProvider) updateStatus(nnk
types.NamespacedNameKind, condition me
return cp
}),
})
+ case types.KindUDPRoute:
+ parentRefs :=
d.client.ConfigManager.GetConfigRefsByResourceKey(nnk)
+ log.Debugw("updating UDPRoute status", zap.Any("parentRefs",
parentRefs))
+ gatewayRefs := map[types.NamespacedNameKind]struct{}{}
+ for _, parentRef := range parentRefs {
+ if parentRef.Kind == types.KindGateway {
+ gatewayRefs[parentRef] = struct{}{}
+ }
+ }
+ d.updater.Update(status.Update{
+ NamespacedName: nnk.NamespacedName(),
+ Resource: &gatewayv1alpha2.UDPRoute{},
+ Mutator: status.MutatorFunc(func(obj client.Object)
client.Object {
+ cp := obj.(*gatewayv1alpha2.UDPRoute).DeepCopy()
+ gatewayNs := cp.GetNamespace()
+ for i, ref := range cp.Status.Parents {
+ ns := gatewayNs
+ if ref.ParentRef.Namespace != nil {
+ ns =
string(*ref.ParentRef.Namespace)
+ }
+ if ref.ParentRef.Kind == nil ||
*ref.ParentRef.Kind == types.KindGateway {
+ nnk := types.NamespacedNameKind{
+ Name:
string(ref.ParentRef.Name),
+ Namespace: ns,
+ Kind:
types.KindGateway,
+ }
+ if _, ok := gatewayRefs[nnk];
ok {
+ ref.Conditions =
cutils.MergeCondition(ref.Conditions, condition)
+ cp.Status.Parents[i] =
ref
+ }
+ }
+ }
+ return cp
+ }),
+ })
case types.KindTCPRoute:
parentRefs :=
d.client.ConfigManager.GetConfigRefsByResourceKey(nnk)
log.Debugw("updating TCPRoute status", zap.Any("parentRefs",
parentRefs))
diff --git a/internal/types/k8s.go b/internal/types/k8s.go
index 1cf20810..9396b940 100644
--- a/internal/types/k8s.go
+++ b/internal/types/k8s.go
@@ -35,6 +35,7 @@ const (
KindGateway = "Gateway"
KindHTTPRoute = "HTTPRoute"
KindTCPRoute = "TCPRoute"
+ KindUDPRoute = "UDPRoute"
KindGRPCRoute = "GRPCRoute"
KindGatewayClass = "GatewayClass"
KindIngress = "Ingress"
@@ -61,6 +62,8 @@ func KindOf(obj any) string {
return KindGateway
case *gatewayv1alpha2.TCPRoute:
return KindTCPRoute
+ case *gatewayv1alpha2.UDPRoute:
+ return KindUDPRoute
case *gatewayv1.HTTPRoute:
return KindHTTPRoute
case *gatewayv1.GRPCRoute:
@@ -109,6 +112,8 @@ func GvkOf(obj any) schema.GroupVersionKind {
return gatewayv1.SchemeGroupVersion.WithKind(kind)
case *gatewayv1alpha2.TCPRoute:
return gatewayv1alpha2.SchemeGroupVersion.WithKind(kind)
+ case *gatewayv1alpha2.UDPRoute:
+ return gatewayv1alpha2.SchemeGroupVersion.WithKind(kind)
case *gatewayv1beta1.ReferenceGrant:
return gatewayv1beta1.SchemeGroupVersion.WithKind(kind)
case *netv1.Ingress, *netv1.IngressClass:
diff --git a/test/e2e/framework/manifests/ingress.yaml
b/test/e2e/framework/manifests/ingress.yaml
index ec509096..2324d4dd 100644
--- a/test/e2e/framework/manifests/ingress.yaml
+++ b/test/e2e/framework/manifests/ingress.yaml
@@ -168,6 +168,7 @@ rules:
- grpcroutes
- httproutes
- tcproutes
+ - udproutes
verbs:
- get
- list
@@ -177,6 +178,7 @@ rules:
resources:
- httproutes/status
- tcproutes/status
+ - udproutes/status
verbs:
- get
- update
diff --git a/test/e2e/gatewayapi/udproute.go b/test/e2e/gatewayapi/udproute.go
new file mode 100644
index 00000000..8b3b5602
--- /dev/null
+++ b/test/e2e/gatewayapi/udproute.go
@@ -0,0 +1,114 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package gatewayapi
+
+import (
+ "fmt"
+ "time"
+
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+
+ "github.com/apache/apisix-ingress-controller/test/e2e/scaffold"
+)
+
+var _ = Describe("UDPRoute E2E Test", Label("networking.k8s.io", "udproute"),
func() {
+ s := scaffold.NewDefaultScaffold()
+ Context("UDPRoute Base", func() {
+
+ var udpGateway = `
+apiVersion: gateway.networking.k8s.io/v1
+kind: Gateway
+metadata:
+ name: %s
+spec:
+ gatewayClassName: %s
+ listeners:
+ - name: udp
+ protocol: UDP
+ port: 80
+ allowedRoutes:
+ kinds:
+ - kind: UDPRoute
+ infrastructure:
+ parametersRef:
+ group: apisix.apache.org
+ kind: GatewayProxy
+ name: apisix-proxy-config
+`
+
+ var udpRoute = `
+apiVersion: gateway.networking.k8s.io/v1alpha2
+kind: UDPRoute
+metadata:
+ name: udp-app-1
+spec:
+ parentRefs:
+ - name: %s
+ sectionName: udp
+ rules:
+ - backendRefs:
+ - name: %s
+ port: %d
+`
+
+ BeforeEach(func() {
+ // Create GatewayProxy
+
Expect(s.CreateResourceFromString(s.GetGatewayProxySpec())).
+ NotTo(HaveOccurred(), "creating GatewayProxy")
+
+ // Create GatewayClass
+ gatewayClassName := s.Namespace()
+
Expect(s.CreateResourceFromString(s.GetGatewayClassYaml())).
+ NotTo(HaveOccurred(), "creating GatewayClass")
+ gcyaml, _ := s.GetResourceYaml("GatewayClass",
gatewayClassName)
+ s.ResourceApplied("GatewayClass", gatewayClassName,
gcyaml, 1)
+
+ // Create Gateway with UDP listener
+ gatewayName := s.Namespace()
+
Expect(s.CreateResourceFromString(fmt.Sprintf(udpGateway, gatewayName,
gatewayClassName))).
+ NotTo(HaveOccurred(), "creating Gateway")
+
+ gwyaml, _ := s.GetResourceYaml("Gateway", gatewayName)
+ s.ResourceApplied("Gateway", gatewayName, gwyaml, 1)
+ })
+
+ It("should route UDP traffic to backend service", func() {
+ dnsSvc := s.NewCoreDNSService()
+ gatewayName := s.Namespace()
+ By("creating UDPRoute")
+ Expect(s.CreateResourceFromString(fmt.Sprintf(udpRoute,
gatewayName, dnsSvc.Name, dnsSvc.Spec.Ports[0].Port))).
+ NotTo(HaveOccurred(), "creating UDPRoute")
+
+ // Verify UDPRoute status becomes programmed
+ routeYaml, _ := s.GetResourceYaml("UDPRoute",
"udp-app-1")
+ s.ResourceApplied("UDPRoute", "udp-app-1", routeYaml, 1)
+
+ svc := s.GetDataplaneService()
+
+ // test dns query
+ output, err :=
s.RunDigDNSClientFromK8s(fmt.Sprintf("@%s", svc.Name), "-p", "9200",
"github.com")
+ Expect(err).NotTo(HaveOccurred(), "dig github.com via
apisix udp proxy")
+ Expect(output).To(ContainSubstring("ADDITIONAL
SECTION"))
+
+ time.Sleep(3 * time.Second)
+ output = s.GetDeploymentLogs(scaffold.CoreDNSDeployment)
+ Expect(output).To(ContainSubstring("github.com. udp"))
+ })
+ })
+})