Copilot commented on code in PR #2570: URL: https://github.com/apache/apisix-ingress-controller/pull/2570#discussion_r2366515546
########## internal/adc/translator/httproute.go: ########## @@ -797,3 +771,47 @@ func (t *Translator) translateGatewayHTTPRouteMatch(match *gatewayv1.HTTPRouteMa return route, nil } + +func HeaderMatchToVars(matchType, name, value string) ([]adctypes.StringOrSlice, error) { + name = strings.ToLower(name) + name = strings.ReplaceAll(name, "-", "_") + + var this []adctypes.StringOrSlice + this = append(this, adctypes.StringOrSlice{ + StrVal: "http_" + name, + }) + + switch matchType { + case string(gatewayv1.HeaderMatchExact): + this = append(this, adctypes.StringOrSlice{ + StrVal: "==", + }) + case string(gatewayv1.HeaderMatchRegularExpression): + this = append(this, adctypes.StringOrSlice{ + StrVal: "~~", + }) + default: + return nil, errors.New("unknown header match type " + matchType) + } + + this = append(this, adctypes.StringOrSlice{ + StrVal: value, + }) + return this, nil +} + +func (t *Translator) translateHTTPRouteHeaderMatchToVars(header gatewayv1.HTTPHeaderMatch) ([]adctypes.StringOrSlice, error) { + var matchType string + if header.Type != nil { + matchType = string(*header.Type) + } + return HeaderMatchToVars(matchType, string(header.Name), header.Value) +} Review Comment: When header.Type is nil, matchType remains an empty string, which will cause HeaderMatchToVars to return an error for 'unknown header match type'. The default should be set to string(gatewayv1.HeaderMatchExact). ########## internal/controller/grpcroute_controller.go: ########## @@ -0,0 +1,584 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package controller + +import ( + "cmp" + "context" + "fmt" + + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + k8stypes "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" + gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" + "sigs.k8s.io/gateway-api/apis/v1beta1" + + "github.com/apache/apisix-ingress-controller/api/v1alpha1" + "github.com/apache/apisix-ingress-controller/internal/controller/indexer" + "github.com/apache/apisix-ingress-controller/internal/controller/status" + "github.com/apache/apisix-ingress-controller/internal/manager/readiness" + "github.com/apache/apisix-ingress-controller/internal/provider" + "github.com/apache/apisix-ingress-controller/internal/types" + "github.com/apache/apisix-ingress-controller/internal/utils" +) + +// GRPCRouteReconciler reconciles a GatewayClass object. +type GRPCRouteReconciler struct { //nolint:revive + client.Client + Scheme *runtime.Scheme + + Log logr.Logger + + Provider provider.Provider + + genericEvent chan event.GenericEvent + + Updater status.Updater + Readier readiness.ReadinessManager +} + +// SetupWithManager sets up the controller with the Manager. +func (r *GRPCRouteReconciler) SetupWithManager(mgr ctrl.Manager) error { + r.genericEvent = make(chan event.GenericEvent, 100) + + bdr := ctrl.NewControllerManagedBy(mgr). + For(&gatewayv1.GRPCRoute{}). + WithEventFilter(predicate.GenerationChangedPredicate{}). + Watches(&discoveryv1.EndpointSlice{}, + handler.EnqueueRequestsFromMapFunc(r.listGRPCRoutesByServiceBef), + ). + Watches(&v1alpha1.PluginConfig{}, + handler.EnqueueRequestsFromMapFunc(r.listGRPCRoutesByExtensionRef), + ). + Watches(&gatewayv1.Gateway{}, + handler.EnqueueRequestsFromMapFunc(r.listGRPCRoutesForGateway), + builder.WithPredicates( + predicate.Funcs{ + GenericFunc: func(e event.GenericEvent) bool { + return false + }, + DeleteFunc: func(e event.DeleteEvent) bool { + return false + }, + CreateFunc: func(e event.CreateEvent) bool { + return true + }, + UpdateFunc: func(e event.UpdateEvent) bool { + return true + }, + }, + ), + ). + Watches(&v1alpha1.BackendTrafficPolicy{}, + handler.EnqueueRequestsFromMapFunc(r.listGRPCRoutesForBackendTrafficPolicy), + builder.WithPredicates( + BackendTrafficPolicyPredicateFunc(r.genericEvent), + ), + ). + Watches(&v1alpha1.GatewayProxy{}, + handler.EnqueueRequestsFromMapFunc(r.listGRPCRoutesForGatewayProxy), + ). + WatchesRawSource( + source.Channel( + r.genericEvent, + handler.EnqueueRequestsFromMapFunc(r.listGRPCRouteForGenericEvent), + ), + ) + + if GetEnableReferenceGrant() { + bdr.Watches(&v1beta1.ReferenceGrant{}, + handler.EnqueueRequestsFromMapFunc(r.listGRPCRoutesForReferenceGrant), + builder.WithPredicates(referenceGrantPredicates(KindGRPCRoute)), + ) + } + + return bdr.Complete(r) +} + +func (r *GRPCRouteReconciler) listGRPCRoutesByExtensionRef(ctx context.Context, obj client.Object) []reconcile.Request { + pluginconfig, ok := obj.(*v1alpha1.PluginConfig) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to EndpointSlice") + return nil + } + namespace := pluginconfig.GetNamespace() + name := pluginconfig.GetName() + + grList := &gatewayv1.GRPCRouteList{} + if err := r.List(ctx, grList, client.MatchingFields{ + indexer.ExtensionRef: indexer.GenIndexKey(namespace, name), + }); err != nil { + r.Log.Error(err, "failed to list grpcroutes by extension reference", "extension", name) + return nil + } + requests := make([]reconcile.Request, 0, len(grList.Items)) + for _, gr := range grList.Items { + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: gr.Namespace, + Name: gr.Name, + }, + }) + } + return requests +} + +func (r *GRPCRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + defer r.Readier.Done(&gatewayv1.GRPCRoute{}, req.NamespacedName) + gr := new(gatewayv1.GRPCRoute) + if err := r.Get(ctx, req.NamespacedName, gr); err != nil { + if client.IgnoreNotFound(err) == nil { + gr.Namespace = req.Namespace + gr.Name = req.Name + + gr.TypeMeta = metav1.TypeMeta{ + Kind: KindGRPCRoute, + APIVersion: gatewayv1.GroupVersion.String(), + } + + if err := r.Provider.Delete(ctx, gr); err != nil { + r.Log.Error(err, "failed to delete grpcroute", "grpcroute", gr) + return ctrl.Result{}, err + } + return ctrl.Result{}, nil + } + return ctrl.Result{}, err + } + + type ResourceStatus struct { + status bool + msg string + } + + // Only keep acceptStatus since we're using error objects directly now + acceptStatus := ResourceStatus{ + status: true, + msg: "Route is accepted", + } + + gateways, err := ParseRouteParentRefs(ctx, r.Client, gr, gr.Spec.ParentRefs) + if err != nil { + return ctrl.Result{}, err + } + + if len(gateways) == 0 { + return ctrl.Result{}, nil + } + + tctx := provider.NewDefaultTranslateContext(ctx) + + tctx.RouteParentRefs = gr.Spec.ParentRefs + rk := utils.NamespacedNameKind(gr) + for _, gateway := range gateways { + if err := ProcessGatewayProxy(r.Client, r.Log, tctx, gateway.Gateway, rk); err != nil { + acceptStatus.status = false + acceptStatus.msg = err.Error() + } + if gateway.Listener != nil { + tctx.Listeners = append(tctx.Listeners, *gateway.Listener) + } + } + + var backendRefErr error + if err := r.processGRPCRoute(tctx, gr); err != nil { + // When encountering a backend reference error, it should not affect the acceptance status + if types.IsSomeReasonError(err, gatewayv1.RouteReasonInvalidKind) { + backendRefErr = err + } else { + acceptStatus.status = false + acceptStatus.msg = err.Error() + } + } + + // Store the backend reference error for later use. + // If the backend reference error is because of an invalid kind, use this error first + if err := r.processGRPCRouteBackendRefs(tctx, req.NamespacedName); err != nil && backendRefErr == nil { + backendRefErr = err + } + + ProcessBackendTrafficPolicy(r.Client, r.Log, tctx) + + // TODO: diff the old and new status + gr.Status.Parents = make([]gatewayv1.RouteParentStatus, 0, len(gateways)) + for _, gateway := range gateways { + parentStatus := gatewayv1.RouteParentStatus{} + SetRouteParentRef(&parentStatus, gateway.Gateway.Name, gateway.Gateway.Namespace) + for _, condition := range gateway.Conditions { + parentStatus.Conditions = MergeCondition(parentStatus.Conditions, condition) + } + SetRouteConditionAccepted(&parentStatus, gr.GetGeneration(), acceptStatus.status, acceptStatus.msg) + SetRouteConditionResolvedRefs(&parentStatus, gr.GetGeneration(), backendRefErr) + + gr.Status.Parents = append(gr.Status.Parents, parentStatus) + } + + r.Updater.Update(status.Update{ + NamespacedName: utils.NamespacedName(gr), + Resource: &gatewayv1.GRPCRoute{}, + Mutator: status.MutatorFunc(func(obj client.Object) client.Object { + h, ok := obj.(*gatewayv1.GRPCRoute) + if !ok { + err := fmt.Errorf("unsupported object type %T", obj) + panic(err) + } + hCopy := h.DeepCopy() + hCopy.Status = gr.Status + return hCopy + }), + }) + UpdateStatus(r.Updater, r.Log, tctx) + + if isRouteAccepted(gateways) && err == nil { + routeToUpdate := gr + if err := r.Provider.Update(ctx, tctx, routeToUpdate); err != nil { + return ctrl.Result{}, err + } + } + return ctrl.Result{}, nil +} + +func (r *GRPCRouteReconciler) listGRPCRoutesByServiceBef(ctx context.Context, obj client.Object) []reconcile.Request { + endpointSlice, ok := obj.(*discoveryv1.EndpointSlice) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to EndpointSlice") + return nil + } + namespace := endpointSlice.GetNamespace() + serviceName := endpointSlice.Labels[discoveryv1.LabelServiceName] + + gList := &gatewayv1.GRPCRouteList{} + if err := r.List(ctx, gList, client.MatchingFields{ + indexer.ServiceIndexRef: indexer.GenIndexKey(namespace, serviceName), + }); err != nil { + r.Log.Error(err, "failed to list grocroutes by service", "service", serviceName) + return nil + } + requests := make([]reconcile.Request, 0, len(gList.Items)) + for _, gr := range gList.Items { + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: gr.Namespace, + Name: gr.Name, + }, + }) + } + return requests +} + +func (r *GRPCRouteReconciler) listGRPCRoutesForBackendTrafficPolicy(ctx context.Context, obj client.Object) []reconcile.Request { + policy, ok := obj.(*v1alpha1.BackendTrafficPolicy) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to BackendTrafficPolicy") + return nil + } + + grpcRouteList := []gatewayv1.GRPCRoute{} + for _, targetRef := range policy.Spec.TargetRefs { + service := &corev1.Service{} + if err := r.Get(ctx, client.ObjectKey{ + Namespace: policy.Namespace, + Name: string(targetRef.Name), + }, service); err != nil { + if client.IgnoreNotFound(err) != nil { + r.Log.Error(err, "failed to get service", "namespace", policy.Namespace, "name", targetRef.Name) + } + continue + } + grList := &gatewayv1.GRPCRouteList{} + if err := r.List(ctx, grList, client.MatchingFields{ + indexer.ServiceIndexRef: indexer.GenIndexKey(policy.Namespace, string(targetRef.Name)), + }); err != nil { + r.Log.Error(err, "failed to list grocroutes by service reference", "service", targetRef.Name) Review Comment: Typo in log messages: 'grocroutes' should be 'grpcroutes'. ```suggestion r.Log.Error(err, "failed to list grpcroutes by service reference", "service", targetRef.Name) ``` ########## internal/controller/grpcroute_controller.go: ########## @@ -0,0 +1,584 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package controller + +import ( + "cmp" + "context" + "fmt" + + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + k8stypes "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" + gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" + "sigs.k8s.io/gateway-api/apis/v1beta1" + + "github.com/apache/apisix-ingress-controller/api/v1alpha1" + "github.com/apache/apisix-ingress-controller/internal/controller/indexer" + "github.com/apache/apisix-ingress-controller/internal/controller/status" + "github.com/apache/apisix-ingress-controller/internal/manager/readiness" + "github.com/apache/apisix-ingress-controller/internal/provider" + "github.com/apache/apisix-ingress-controller/internal/types" + "github.com/apache/apisix-ingress-controller/internal/utils" +) + +// GRPCRouteReconciler reconciles a GatewayClass object. +type GRPCRouteReconciler struct { //nolint:revive + client.Client + Scheme *runtime.Scheme + + Log logr.Logger + + Provider provider.Provider + + genericEvent chan event.GenericEvent + + Updater status.Updater + Readier readiness.ReadinessManager +} + +// SetupWithManager sets up the controller with the Manager. +func (r *GRPCRouteReconciler) SetupWithManager(mgr ctrl.Manager) error { + r.genericEvent = make(chan event.GenericEvent, 100) + + bdr := ctrl.NewControllerManagedBy(mgr). + For(&gatewayv1.GRPCRoute{}). + WithEventFilter(predicate.GenerationChangedPredicate{}). + Watches(&discoveryv1.EndpointSlice{}, + handler.EnqueueRequestsFromMapFunc(r.listGRPCRoutesByServiceBef), + ). + Watches(&v1alpha1.PluginConfig{}, + handler.EnqueueRequestsFromMapFunc(r.listGRPCRoutesByExtensionRef), + ). + Watches(&gatewayv1.Gateway{}, + handler.EnqueueRequestsFromMapFunc(r.listGRPCRoutesForGateway), + builder.WithPredicates( + predicate.Funcs{ + GenericFunc: func(e event.GenericEvent) bool { + return false + }, + DeleteFunc: func(e event.DeleteEvent) bool { + return false + }, + CreateFunc: func(e event.CreateEvent) bool { + return true + }, + UpdateFunc: func(e event.UpdateEvent) bool { + return true + }, + }, + ), + ). + Watches(&v1alpha1.BackendTrafficPolicy{}, + handler.EnqueueRequestsFromMapFunc(r.listGRPCRoutesForBackendTrafficPolicy), + builder.WithPredicates( + BackendTrafficPolicyPredicateFunc(r.genericEvent), + ), + ). + Watches(&v1alpha1.GatewayProxy{}, + handler.EnqueueRequestsFromMapFunc(r.listGRPCRoutesForGatewayProxy), + ). + WatchesRawSource( + source.Channel( + r.genericEvent, + handler.EnqueueRequestsFromMapFunc(r.listGRPCRouteForGenericEvent), + ), + ) + + if GetEnableReferenceGrant() { + bdr.Watches(&v1beta1.ReferenceGrant{}, + handler.EnqueueRequestsFromMapFunc(r.listGRPCRoutesForReferenceGrant), + builder.WithPredicates(referenceGrantPredicates(KindGRPCRoute)), + ) + } + + return bdr.Complete(r) +} + +func (r *GRPCRouteReconciler) listGRPCRoutesByExtensionRef(ctx context.Context, obj client.Object) []reconcile.Request { + pluginconfig, ok := obj.(*v1alpha1.PluginConfig) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to EndpointSlice") + return nil + } + namespace := pluginconfig.GetNamespace() + name := pluginconfig.GetName() + + grList := &gatewayv1.GRPCRouteList{} + if err := r.List(ctx, grList, client.MatchingFields{ + indexer.ExtensionRef: indexer.GenIndexKey(namespace, name), + }); err != nil { + r.Log.Error(err, "failed to list grpcroutes by extension reference", "extension", name) + return nil + } + requests := make([]reconcile.Request, 0, len(grList.Items)) + for _, gr := range grList.Items { + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: gr.Namespace, + Name: gr.Name, + }, + }) + } + return requests +} + +func (r *GRPCRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + defer r.Readier.Done(&gatewayv1.GRPCRoute{}, req.NamespacedName) + gr := new(gatewayv1.GRPCRoute) + if err := r.Get(ctx, req.NamespacedName, gr); err != nil { + if client.IgnoreNotFound(err) == nil { + gr.Namespace = req.Namespace + gr.Name = req.Name + + gr.TypeMeta = metav1.TypeMeta{ + Kind: KindGRPCRoute, + APIVersion: gatewayv1.GroupVersion.String(), + } + + if err := r.Provider.Delete(ctx, gr); err != nil { + r.Log.Error(err, "failed to delete grpcroute", "grpcroute", gr) + return ctrl.Result{}, err + } + return ctrl.Result{}, nil + } + return ctrl.Result{}, err + } + + type ResourceStatus struct { + status bool + msg string + } + + // Only keep acceptStatus since we're using error objects directly now + acceptStatus := ResourceStatus{ + status: true, + msg: "Route is accepted", + } + + gateways, err := ParseRouteParentRefs(ctx, r.Client, gr, gr.Spec.ParentRefs) + if err != nil { + return ctrl.Result{}, err + } + + if len(gateways) == 0 { + return ctrl.Result{}, nil + } + + tctx := provider.NewDefaultTranslateContext(ctx) + + tctx.RouteParentRefs = gr.Spec.ParentRefs + rk := utils.NamespacedNameKind(gr) + for _, gateway := range gateways { + if err := ProcessGatewayProxy(r.Client, r.Log, tctx, gateway.Gateway, rk); err != nil { + acceptStatus.status = false + acceptStatus.msg = err.Error() + } + if gateway.Listener != nil { + tctx.Listeners = append(tctx.Listeners, *gateway.Listener) + } + } + + var backendRefErr error + if err := r.processGRPCRoute(tctx, gr); err != nil { + // When encountering a backend reference error, it should not affect the acceptance status + if types.IsSomeReasonError(err, gatewayv1.RouteReasonInvalidKind) { + backendRefErr = err + } else { + acceptStatus.status = false + acceptStatus.msg = err.Error() + } + } + + // Store the backend reference error for later use. + // If the backend reference error is because of an invalid kind, use this error first + if err := r.processGRPCRouteBackendRefs(tctx, req.NamespacedName); err != nil && backendRefErr == nil { + backendRefErr = err + } + + ProcessBackendTrafficPolicy(r.Client, r.Log, tctx) + + // TODO: diff the old and new status + gr.Status.Parents = make([]gatewayv1.RouteParentStatus, 0, len(gateways)) + for _, gateway := range gateways { + parentStatus := gatewayv1.RouteParentStatus{} + SetRouteParentRef(&parentStatus, gateway.Gateway.Name, gateway.Gateway.Namespace) + for _, condition := range gateway.Conditions { + parentStatus.Conditions = MergeCondition(parentStatus.Conditions, condition) + } + SetRouteConditionAccepted(&parentStatus, gr.GetGeneration(), acceptStatus.status, acceptStatus.msg) + SetRouteConditionResolvedRefs(&parentStatus, gr.GetGeneration(), backendRefErr) + + gr.Status.Parents = append(gr.Status.Parents, parentStatus) + } + + r.Updater.Update(status.Update{ + NamespacedName: utils.NamespacedName(gr), + Resource: &gatewayv1.GRPCRoute{}, + Mutator: status.MutatorFunc(func(obj client.Object) client.Object { + h, ok := obj.(*gatewayv1.GRPCRoute) + if !ok { + err := fmt.Errorf("unsupported object type %T", obj) + panic(err) + } + hCopy := h.DeepCopy() + hCopy.Status = gr.Status + return hCopy + }), + }) + UpdateStatus(r.Updater, r.Log, tctx) + + if isRouteAccepted(gateways) && err == nil { + routeToUpdate := gr + if err := r.Provider.Update(ctx, tctx, routeToUpdate); err != nil { + return ctrl.Result{}, err + } + } + return ctrl.Result{}, nil +} + +func (r *GRPCRouteReconciler) listGRPCRoutesByServiceBef(ctx context.Context, obj client.Object) []reconcile.Request { + endpointSlice, ok := obj.(*discoveryv1.EndpointSlice) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to EndpointSlice") + return nil + } + namespace := endpointSlice.GetNamespace() + serviceName := endpointSlice.Labels[discoveryv1.LabelServiceName] + + gList := &gatewayv1.GRPCRouteList{} + if err := r.List(ctx, gList, client.MatchingFields{ + indexer.ServiceIndexRef: indexer.GenIndexKey(namespace, serviceName), + }); err != nil { + r.Log.Error(err, "failed to list grocroutes by service", "service", serviceName) + return nil + } + requests := make([]reconcile.Request, 0, len(gList.Items)) + for _, gr := range gList.Items { + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: gr.Namespace, + Name: gr.Name, + }, + }) + } + return requests +} + +func (r *GRPCRouteReconciler) listGRPCRoutesForBackendTrafficPolicy(ctx context.Context, obj client.Object) []reconcile.Request { + policy, ok := obj.(*v1alpha1.BackendTrafficPolicy) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to BackendTrafficPolicy") + return nil + } + + grpcRouteList := []gatewayv1.GRPCRoute{} + for _, targetRef := range policy.Spec.TargetRefs { + service := &corev1.Service{} + if err := r.Get(ctx, client.ObjectKey{ + Namespace: policy.Namespace, + Name: string(targetRef.Name), + }, service); err != nil { + if client.IgnoreNotFound(err) != nil { + r.Log.Error(err, "failed to get service", "namespace", policy.Namespace, "name", targetRef.Name) + } + continue + } + grList := &gatewayv1.GRPCRouteList{} + if err := r.List(ctx, grList, client.MatchingFields{ + indexer.ServiceIndexRef: indexer.GenIndexKey(policy.Namespace, string(targetRef.Name)), + }); err != nil { + r.Log.Error(err, "failed to list grocroutes by service reference", "service", targetRef.Name) Review Comment: Typo in log messages: 'grocroutes' should be 'grpcroutes'. ```suggestion r.Log.Error(err, "failed to list grpcroutes by service reference", "service", targetRef.Name) ``` ########## internal/controller/grpcroute_controller.go: ########## @@ -0,0 +1,584 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package controller + +import ( + "cmp" + "context" + "fmt" + + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + k8stypes "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" + gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" + "sigs.k8s.io/gateway-api/apis/v1beta1" + + "github.com/apache/apisix-ingress-controller/api/v1alpha1" + "github.com/apache/apisix-ingress-controller/internal/controller/indexer" + "github.com/apache/apisix-ingress-controller/internal/controller/status" + "github.com/apache/apisix-ingress-controller/internal/manager/readiness" + "github.com/apache/apisix-ingress-controller/internal/provider" + "github.com/apache/apisix-ingress-controller/internal/types" + "github.com/apache/apisix-ingress-controller/internal/utils" +) + +// GRPCRouteReconciler reconciles a GatewayClass object. +type GRPCRouteReconciler struct { //nolint:revive + client.Client + Scheme *runtime.Scheme + + Log logr.Logger + + Provider provider.Provider + + genericEvent chan event.GenericEvent + + Updater status.Updater + Readier readiness.ReadinessManager +} + +// SetupWithManager sets up the controller with the Manager. +func (r *GRPCRouteReconciler) SetupWithManager(mgr ctrl.Manager) error { + r.genericEvent = make(chan event.GenericEvent, 100) + + bdr := ctrl.NewControllerManagedBy(mgr). + For(&gatewayv1.GRPCRoute{}). + WithEventFilter(predicate.GenerationChangedPredicate{}). + Watches(&discoveryv1.EndpointSlice{}, + handler.EnqueueRequestsFromMapFunc(r.listGRPCRoutesByServiceBef), + ). + Watches(&v1alpha1.PluginConfig{}, + handler.EnqueueRequestsFromMapFunc(r.listGRPCRoutesByExtensionRef), + ). + Watches(&gatewayv1.Gateway{}, + handler.EnqueueRequestsFromMapFunc(r.listGRPCRoutesForGateway), + builder.WithPredicates( + predicate.Funcs{ + GenericFunc: func(e event.GenericEvent) bool { + return false + }, + DeleteFunc: func(e event.DeleteEvent) bool { + return false + }, + CreateFunc: func(e event.CreateEvent) bool { + return true + }, + UpdateFunc: func(e event.UpdateEvent) bool { + return true + }, + }, + ), + ). + Watches(&v1alpha1.BackendTrafficPolicy{}, + handler.EnqueueRequestsFromMapFunc(r.listGRPCRoutesForBackendTrafficPolicy), + builder.WithPredicates( + BackendTrafficPolicyPredicateFunc(r.genericEvent), + ), + ). + Watches(&v1alpha1.GatewayProxy{}, + handler.EnqueueRequestsFromMapFunc(r.listGRPCRoutesForGatewayProxy), + ). + WatchesRawSource( + source.Channel( + r.genericEvent, + handler.EnqueueRequestsFromMapFunc(r.listGRPCRouteForGenericEvent), + ), + ) + + if GetEnableReferenceGrant() { + bdr.Watches(&v1beta1.ReferenceGrant{}, + handler.EnqueueRequestsFromMapFunc(r.listGRPCRoutesForReferenceGrant), + builder.WithPredicates(referenceGrantPredicates(KindGRPCRoute)), + ) + } + + return bdr.Complete(r) +} + +func (r *GRPCRouteReconciler) listGRPCRoutesByExtensionRef(ctx context.Context, obj client.Object) []reconcile.Request { + pluginconfig, ok := obj.(*v1alpha1.PluginConfig) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to EndpointSlice") + return nil + } + namespace := pluginconfig.GetNamespace() + name := pluginconfig.GetName() + + grList := &gatewayv1.GRPCRouteList{} + if err := r.List(ctx, grList, client.MatchingFields{ + indexer.ExtensionRef: indexer.GenIndexKey(namespace, name), + }); err != nil { + r.Log.Error(err, "failed to list grpcroutes by extension reference", "extension", name) + return nil + } + requests := make([]reconcile.Request, 0, len(grList.Items)) + for _, gr := range grList.Items { + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: gr.Namespace, + Name: gr.Name, + }, + }) + } + return requests +} + +func (r *GRPCRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + defer r.Readier.Done(&gatewayv1.GRPCRoute{}, req.NamespacedName) + gr := new(gatewayv1.GRPCRoute) + if err := r.Get(ctx, req.NamespacedName, gr); err != nil { + if client.IgnoreNotFound(err) == nil { + gr.Namespace = req.Namespace + gr.Name = req.Name + + gr.TypeMeta = metav1.TypeMeta{ + Kind: KindGRPCRoute, + APIVersion: gatewayv1.GroupVersion.String(), + } + + if err := r.Provider.Delete(ctx, gr); err != nil { + r.Log.Error(err, "failed to delete grpcroute", "grpcroute", gr) + return ctrl.Result{}, err + } + return ctrl.Result{}, nil + } + return ctrl.Result{}, err + } + + type ResourceStatus struct { + status bool + msg string + } + + // Only keep acceptStatus since we're using error objects directly now + acceptStatus := ResourceStatus{ + status: true, + msg: "Route is accepted", + } + + gateways, err := ParseRouteParentRefs(ctx, r.Client, gr, gr.Spec.ParentRefs) + if err != nil { + return ctrl.Result{}, err + } + + if len(gateways) == 0 { + return ctrl.Result{}, nil + } + + tctx := provider.NewDefaultTranslateContext(ctx) + + tctx.RouteParentRefs = gr.Spec.ParentRefs + rk := utils.NamespacedNameKind(gr) + for _, gateway := range gateways { + if err := ProcessGatewayProxy(r.Client, r.Log, tctx, gateway.Gateway, rk); err != nil { + acceptStatus.status = false + acceptStatus.msg = err.Error() + } + if gateway.Listener != nil { + tctx.Listeners = append(tctx.Listeners, *gateway.Listener) + } + } + + var backendRefErr error + if err := r.processGRPCRoute(tctx, gr); err != nil { + // When encountering a backend reference error, it should not affect the acceptance status + if types.IsSomeReasonError(err, gatewayv1.RouteReasonInvalidKind) { + backendRefErr = err + } else { + acceptStatus.status = false + acceptStatus.msg = err.Error() + } + } + + // Store the backend reference error for later use. + // If the backend reference error is because of an invalid kind, use this error first + if err := r.processGRPCRouteBackendRefs(tctx, req.NamespacedName); err != nil && backendRefErr == nil { + backendRefErr = err + } + + ProcessBackendTrafficPolicy(r.Client, r.Log, tctx) + + // TODO: diff the old and new status + gr.Status.Parents = make([]gatewayv1.RouteParentStatus, 0, len(gateways)) + for _, gateway := range gateways { + parentStatus := gatewayv1.RouteParentStatus{} + SetRouteParentRef(&parentStatus, gateway.Gateway.Name, gateway.Gateway.Namespace) + for _, condition := range gateway.Conditions { + parentStatus.Conditions = MergeCondition(parentStatus.Conditions, condition) + } + SetRouteConditionAccepted(&parentStatus, gr.GetGeneration(), acceptStatus.status, acceptStatus.msg) + SetRouteConditionResolvedRefs(&parentStatus, gr.GetGeneration(), backendRefErr) + + gr.Status.Parents = append(gr.Status.Parents, parentStatus) + } + + r.Updater.Update(status.Update{ + NamespacedName: utils.NamespacedName(gr), + Resource: &gatewayv1.GRPCRoute{}, + Mutator: status.MutatorFunc(func(obj client.Object) client.Object { + h, ok := obj.(*gatewayv1.GRPCRoute) + if !ok { + err := fmt.Errorf("unsupported object type %T", obj) + panic(err) + } + hCopy := h.DeepCopy() + hCopy.Status = gr.Status + return hCopy + }), + }) + UpdateStatus(r.Updater, r.Log, tctx) + + if isRouteAccepted(gateways) && err == nil { + routeToUpdate := gr + if err := r.Provider.Update(ctx, tctx, routeToUpdate); err != nil { + return ctrl.Result{}, err + } + } + return ctrl.Result{}, nil +} + +func (r *GRPCRouteReconciler) listGRPCRoutesByServiceBef(ctx context.Context, obj client.Object) []reconcile.Request { + endpointSlice, ok := obj.(*discoveryv1.EndpointSlice) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to EndpointSlice") + return nil + } + namespace := endpointSlice.GetNamespace() + serviceName := endpointSlice.Labels[discoveryv1.LabelServiceName] + + gList := &gatewayv1.GRPCRouteList{} + if err := r.List(ctx, gList, client.MatchingFields{ + indexer.ServiceIndexRef: indexer.GenIndexKey(namespace, serviceName), + }); err != nil { + r.Log.Error(err, "failed to list grocroutes by service", "service", serviceName) + return nil + } + requests := make([]reconcile.Request, 0, len(gList.Items)) + for _, gr := range gList.Items { + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: gr.Namespace, + Name: gr.Name, + }, + }) + } + return requests +} + +func (r *GRPCRouteReconciler) listGRPCRoutesForBackendTrafficPolicy(ctx context.Context, obj client.Object) []reconcile.Request { + policy, ok := obj.(*v1alpha1.BackendTrafficPolicy) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to BackendTrafficPolicy") + return nil + } + + grpcRouteList := []gatewayv1.GRPCRoute{} + for _, targetRef := range policy.Spec.TargetRefs { + service := &corev1.Service{} + if err := r.Get(ctx, client.ObjectKey{ + Namespace: policy.Namespace, + Name: string(targetRef.Name), + }, service); err != nil { + if client.IgnoreNotFound(err) != nil { + r.Log.Error(err, "failed to get service", "namespace", policy.Namespace, "name", targetRef.Name) + } + continue + } + grList := &gatewayv1.GRPCRouteList{} + if err := r.List(ctx, grList, client.MatchingFields{ + indexer.ServiceIndexRef: indexer.GenIndexKey(policy.Namespace, string(targetRef.Name)), + }); err != nil { + r.Log.Error(err, "failed to list grocroutes by service reference", "service", targetRef.Name) + return nil + } + grpcRouteList = append(grpcRouteList, grList.Items...) + } + var namespacedNameMap = make(map[k8stypes.NamespacedName]struct{}) + requests := make([]reconcile.Request, 0, len(grpcRouteList)) + for _, gr := range grpcRouteList { + key := k8stypes.NamespacedName{ + Namespace: gr.Namespace, + Name: gr.Name, + } + if _, ok := namespacedNameMap[key]; !ok { + namespacedNameMap[key] = struct{}{} + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: gr.Namespace, + Name: gr.Name, + }, + }) + } + } + return requests +} + +func (r *GRPCRouteReconciler) listGRPCRoutesForGateway(ctx context.Context, obj client.Object) []reconcile.Request { + gateway, ok := obj.(*gatewayv1.Gateway) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to Gateway") + } + grList := &gatewayv1.GRPCRouteList{} + if err := r.List(ctx, grList, client.MatchingFields{ + indexer.ParentRefs: indexer.GenIndexKey(gateway.Namespace, gateway.Name), + }); err != nil { + r.Log.Error(err, "failed to list grocroutes by gateway", "gateway", gateway.Name) Review Comment: Typo in log messages: 'grocroutes' should be 'grpcroutes'. ```suggestion r.Log.Error(err, "failed to list grpcroutes by gateway", "gateway", gateway.Name) ``` ########## internal/controller/grpcroute_controller.go: ########## @@ -0,0 +1,584 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package controller + +import ( + "cmp" + "context" + "fmt" + + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + k8stypes "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" + gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" + "sigs.k8s.io/gateway-api/apis/v1beta1" + + "github.com/apache/apisix-ingress-controller/api/v1alpha1" + "github.com/apache/apisix-ingress-controller/internal/controller/indexer" + "github.com/apache/apisix-ingress-controller/internal/controller/status" + "github.com/apache/apisix-ingress-controller/internal/manager/readiness" + "github.com/apache/apisix-ingress-controller/internal/provider" + "github.com/apache/apisix-ingress-controller/internal/types" + "github.com/apache/apisix-ingress-controller/internal/utils" +) + +// GRPCRouteReconciler reconciles a GatewayClass object. +type GRPCRouteReconciler struct { //nolint:revive + client.Client + Scheme *runtime.Scheme + + Log logr.Logger + + Provider provider.Provider + + genericEvent chan event.GenericEvent + + Updater status.Updater + Readier readiness.ReadinessManager +} + +// SetupWithManager sets up the controller with the Manager. +func (r *GRPCRouteReconciler) SetupWithManager(mgr ctrl.Manager) error { + r.genericEvent = make(chan event.GenericEvent, 100) + + bdr := ctrl.NewControllerManagedBy(mgr). + For(&gatewayv1.GRPCRoute{}). + WithEventFilter(predicate.GenerationChangedPredicate{}). + Watches(&discoveryv1.EndpointSlice{}, + handler.EnqueueRequestsFromMapFunc(r.listGRPCRoutesByServiceBef), + ). + Watches(&v1alpha1.PluginConfig{}, + handler.EnqueueRequestsFromMapFunc(r.listGRPCRoutesByExtensionRef), + ). + Watches(&gatewayv1.Gateway{}, + handler.EnqueueRequestsFromMapFunc(r.listGRPCRoutesForGateway), + builder.WithPredicates( + predicate.Funcs{ + GenericFunc: func(e event.GenericEvent) bool { + return false + }, + DeleteFunc: func(e event.DeleteEvent) bool { + return false + }, + CreateFunc: func(e event.CreateEvent) bool { + return true + }, + UpdateFunc: func(e event.UpdateEvent) bool { + return true + }, + }, + ), + ). + Watches(&v1alpha1.BackendTrafficPolicy{}, + handler.EnqueueRequestsFromMapFunc(r.listGRPCRoutesForBackendTrafficPolicy), + builder.WithPredicates( + BackendTrafficPolicyPredicateFunc(r.genericEvent), + ), + ). + Watches(&v1alpha1.GatewayProxy{}, + handler.EnqueueRequestsFromMapFunc(r.listGRPCRoutesForGatewayProxy), + ). + WatchesRawSource( + source.Channel( + r.genericEvent, + handler.EnqueueRequestsFromMapFunc(r.listGRPCRouteForGenericEvent), + ), + ) + + if GetEnableReferenceGrant() { + bdr.Watches(&v1beta1.ReferenceGrant{}, + handler.EnqueueRequestsFromMapFunc(r.listGRPCRoutesForReferenceGrant), + builder.WithPredicates(referenceGrantPredicates(KindGRPCRoute)), + ) + } + + return bdr.Complete(r) +} + +func (r *GRPCRouteReconciler) listGRPCRoutesByExtensionRef(ctx context.Context, obj client.Object) []reconcile.Request { + pluginconfig, ok := obj.(*v1alpha1.PluginConfig) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to EndpointSlice") + return nil + } + namespace := pluginconfig.GetNamespace() + name := pluginconfig.GetName() + + grList := &gatewayv1.GRPCRouteList{} + if err := r.List(ctx, grList, client.MatchingFields{ + indexer.ExtensionRef: indexer.GenIndexKey(namespace, name), + }); err != nil { + r.Log.Error(err, "failed to list grpcroutes by extension reference", "extension", name) + return nil + } + requests := make([]reconcile.Request, 0, len(grList.Items)) + for _, gr := range grList.Items { + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: gr.Namespace, + Name: gr.Name, + }, + }) + } + return requests +} + +func (r *GRPCRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + defer r.Readier.Done(&gatewayv1.GRPCRoute{}, req.NamespacedName) + gr := new(gatewayv1.GRPCRoute) + if err := r.Get(ctx, req.NamespacedName, gr); err != nil { + if client.IgnoreNotFound(err) == nil { + gr.Namespace = req.Namespace + gr.Name = req.Name + + gr.TypeMeta = metav1.TypeMeta{ + Kind: KindGRPCRoute, + APIVersion: gatewayv1.GroupVersion.String(), + } + + if err := r.Provider.Delete(ctx, gr); err != nil { + r.Log.Error(err, "failed to delete grpcroute", "grpcroute", gr) + return ctrl.Result{}, err + } + return ctrl.Result{}, nil + } + return ctrl.Result{}, err + } + + type ResourceStatus struct { + status bool + msg string + } + + // Only keep acceptStatus since we're using error objects directly now + acceptStatus := ResourceStatus{ + status: true, + msg: "Route is accepted", + } + + gateways, err := ParseRouteParentRefs(ctx, r.Client, gr, gr.Spec.ParentRefs) + if err != nil { + return ctrl.Result{}, err + } + + if len(gateways) == 0 { + return ctrl.Result{}, nil + } + + tctx := provider.NewDefaultTranslateContext(ctx) + + tctx.RouteParentRefs = gr.Spec.ParentRefs + rk := utils.NamespacedNameKind(gr) + for _, gateway := range gateways { + if err := ProcessGatewayProxy(r.Client, r.Log, tctx, gateway.Gateway, rk); err != nil { + acceptStatus.status = false + acceptStatus.msg = err.Error() + } + if gateway.Listener != nil { + tctx.Listeners = append(tctx.Listeners, *gateway.Listener) + } + } + + var backendRefErr error + if err := r.processGRPCRoute(tctx, gr); err != nil { + // When encountering a backend reference error, it should not affect the acceptance status + if types.IsSomeReasonError(err, gatewayv1.RouteReasonInvalidKind) { + backendRefErr = err + } else { + acceptStatus.status = false + acceptStatus.msg = err.Error() + } + } + + // Store the backend reference error for later use. + // If the backend reference error is because of an invalid kind, use this error first + if err := r.processGRPCRouteBackendRefs(tctx, req.NamespacedName); err != nil && backendRefErr == nil { + backendRefErr = err + } + + ProcessBackendTrafficPolicy(r.Client, r.Log, tctx) + + // TODO: diff the old and new status + gr.Status.Parents = make([]gatewayv1.RouteParentStatus, 0, len(gateways)) + for _, gateway := range gateways { + parentStatus := gatewayv1.RouteParentStatus{} + SetRouteParentRef(&parentStatus, gateway.Gateway.Name, gateway.Gateway.Namespace) + for _, condition := range gateway.Conditions { + parentStatus.Conditions = MergeCondition(parentStatus.Conditions, condition) + } + SetRouteConditionAccepted(&parentStatus, gr.GetGeneration(), acceptStatus.status, acceptStatus.msg) + SetRouteConditionResolvedRefs(&parentStatus, gr.GetGeneration(), backendRefErr) + + gr.Status.Parents = append(gr.Status.Parents, parentStatus) + } + + r.Updater.Update(status.Update{ + NamespacedName: utils.NamespacedName(gr), + Resource: &gatewayv1.GRPCRoute{}, + Mutator: status.MutatorFunc(func(obj client.Object) client.Object { + h, ok := obj.(*gatewayv1.GRPCRoute) + if !ok { + err := fmt.Errorf("unsupported object type %T", obj) + panic(err) + } + hCopy := h.DeepCopy() + hCopy.Status = gr.Status + return hCopy + }), + }) + UpdateStatus(r.Updater, r.Log, tctx) + + if isRouteAccepted(gateways) && err == nil { + routeToUpdate := gr + if err := r.Provider.Update(ctx, tctx, routeToUpdate); err != nil { + return ctrl.Result{}, err + } + } + return ctrl.Result{}, nil +} + +func (r *GRPCRouteReconciler) listGRPCRoutesByServiceBef(ctx context.Context, obj client.Object) []reconcile.Request { + endpointSlice, ok := obj.(*discoveryv1.EndpointSlice) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to EndpointSlice") + return nil + } + namespace := endpointSlice.GetNamespace() + serviceName := endpointSlice.Labels[discoveryv1.LabelServiceName] + + gList := &gatewayv1.GRPCRouteList{} + if err := r.List(ctx, gList, client.MatchingFields{ + indexer.ServiceIndexRef: indexer.GenIndexKey(namespace, serviceName), + }); err != nil { + r.Log.Error(err, "failed to list grocroutes by service", "service", serviceName) + return nil + } + requests := make([]reconcile.Request, 0, len(gList.Items)) + for _, gr := range gList.Items { + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: gr.Namespace, + Name: gr.Name, + }, + }) + } + return requests +} + +func (r *GRPCRouteReconciler) listGRPCRoutesForBackendTrafficPolicy(ctx context.Context, obj client.Object) []reconcile.Request { + policy, ok := obj.(*v1alpha1.BackendTrafficPolicy) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to BackendTrafficPolicy") + return nil + } + + grpcRouteList := []gatewayv1.GRPCRoute{} + for _, targetRef := range policy.Spec.TargetRefs { + service := &corev1.Service{} + if err := r.Get(ctx, client.ObjectKey{ + Namespace: policy.Namespace, + Name: string(targetRef.Name), + }, service); err != nil { + if client.IgnoreNotFound(err) != nil { + r.Log.Error(err, "failed to get service", "namespace", policy.Namespace, "name", targetRef.Name) + } + continue + } + grList := &gatewayv1.GRPCRouteList{} + if err := r.List(ctx, grList, client.MatchingFields{ + indexer.ServiceIndexRef: indexer.GenIndexKey(policy.Namespace, string(targetRef.Name)), + }); err != nil { + r.Log.Error(err, "failed to list grocroutes by service reference", "service", targetRef.Name) + return nil + } + grpcRouteList = append(grpcRouteList, grList.Items...) + } + var namespacedNameMap = make(map[k8stypes.NamespacedName]struct{}) + requests := make([]reconcile.Request, 0, len(grpcRouteList)) + for _, gr := range grpcRouteList { + key := k8stypes.NamespacedName{ + Namespace: gr.Namespace, + Name: gr.Name, + } + if _, ok := namespacedNameMap[key]; !ok { + namespacedNameMap[key] = struct{}{} + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: gr.Namespace, + Name: gr.Name, + }, + }) + } + } + return requests +} + +func (r *GRPCRouteReconciler) listGRPCRoutesForGateway(ctx context.Context, obj client.Object) []reconcile.Request { + gateway, ok := obj.(*gatewayv1.Gateway) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to Gateway") + } + grList := &gatewayv1.GRPCRouteList{} + if err := r.List(ctx, grList, client.MatchingFields{ + indexer.ParentRefs: indexer.GenIndexKey(gateway.Namespace, gateway.Name), + }); err != nil { + r.Log.Error(err, "failed to list grocroutes by gateway", "gateway", gateway.Name) + return nil + } + requests := make([]reconcile.Request, 0, len(grList.Items)) + for _, gr := range grList.Items { + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: gr.Namespace, + Name: gr.Name, + }, + }) + } + return requests +} + +func (r *GRPCRouteReconciler) listGRPCRouteForGenericEvent(ctx context.Context, obj client.Object) (requests []reconcile.Request) { + switch obj.(type) { + case *v1alpha1.BackendTrafficPolicy: + return r.listGRPCRoutesForBackendTrafficPolicy(ctx, obj) + default: + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to BackendTrafficPolicy") + return nil + } +} + +func (r *GRPCRouteReconciler) processGRPCRouteBackendRefs(tctx *provider.TranslateContext, grNN k8stypes.NamespacedName) error { + var terr error + for _, backend := range tctx.BackendRefs { + targetNN := k8stypes.NamespacedName{ + Namespace: grNN.Namespace, + Name: string(backend.Name), + } + if backend.Namespace != nil { + targetNN.Namespace = string(*backend.Namespace) + } + + if backend.Kind != nil && *backend.Kind != "Service" { + terr = types.NewInvalidKindError(*backend.Kind) + continue + } + + if backend.Port == nil { + terr = fmt.Errorf("port is required") + continue + } + + var service corev1.Service + if err := r.Get(tctx, targetNN, &service); err != nil { + terr = err + if client.IgnoreNotFound(err) == nil { + terr = types.ReasonError{ + Reason: string(gatewayv1.RouteReasonBackendNotFound), + Message: fmt.Sprintf("Service %s not found", targetNN), + } + } + continue + } + + // if cross namespaces between GRPCRoute and referenced Service, check ReferenceGrant + if grNN.Namespace != targetNN.Namespace { + if permitted := checkReferenceGrant(tctx, + r.Client, + v1beta1.ReferenceGrantFrom{ + Group: gatewayv1.GroupName, + Kind: KindGRPCRoute, + Namespace: v1beta1.Namespace(grNN.Namespace), + }, + gatewayv1.ObjectReference{ + Group: corev1.GroupName, + Kind: KindService, + Name: gatewayv1.ObjectName(targetNN.Name), + Namespace: (*gatewayv1.Namespace)(&targetNN.Namespace), + }, + ); !permitted { + terr = types.ReasonError{ + Reason: string(v1beta1.RouteReasonRefNotPermitted), + Message: fmt.Sprintf("%s is in a different namespace than the GRPCRoute %s and no ReferenceGrant allowing reference is configured", targetNN, grNN), + } + continue + } + } + + if service.Spec.Type == corev1.ServiceTypeExternalName { + tctx.Services[targetNN] = &service + continue + } + + portExists := false + for _, port := range service.Spec.Ports { + if port.Port == int32(*backend.Port) { + portExists = true + break + } + } + if !portExists { + terr = fmt.Errorf("port %d not found in service %s", *backend.Port, targetNN.Name) + continue + } + tctx.Services[targetNN] = &service + + endpointSliceList := new(discoveryv1.EndpointSliceList) + if err := r.List(tctx, endpointSliceList, + client.InNamespace(targetNN.Namespace), + client.MatchingLabels{ + discoveryv1.LabelServiceName: targetNN.Name, + }, + ); err != nil { + r.Log.Error(err, "failed to list endpoint slices", "Service", targetNN) + terr = err + continue + } + + tctx.EndpointSlices[targetNN] = endpointSliceList.Items + } + return terr +} + +func (r *GRPCRouteReconciler) processGRPCRoute(tctx *provider.TranslateContext, grpcroute *gatewayv1.GRPCRoute) error { + var terror error + for _, rule := range grpcroute.Spec.Rules { + for _, filter := range rule.Filters { + if filter.Type != gatewayv1.GRPCRouteFilterExtensionRef || filter.ExtensionRef == nil { + continue + } + if filter.ExtensionRef.Kind == "PluginConfig" { + pluginconfig := new(v1alpha1.PluginConfig) + if err := r.Get(context.Background(), client.ObjectKey{ + Namespace: grpcroute.GetNamespace(), + Name: string(filter.ExtensionRef.Name), + }, pluginconfig); err != nil { + terror = err + continue + } + tctx.PluginConfigs[k8stypes.NamespacedName{ + Namespace: grpcroute.GetNamespace(), + Name: string(filter.ExtensionRef.Name), + }] = pluginconfig + } + } + for _, backend := range rule.BackendRefs { + if backend.Kind != nil && *backend.Kind != "Service" { + terror = types.NewInvalidKindError(*backend.Kind) + continue + } + tctx.BackendRefs = append(tctx.BackendRefs, gatewayv1.BackendRef{ + BackendObjectReference: gatewayv1.BackendObjectReference{ + Name: backend.Name, + Namespace: cmp.Or(backend.Namespace, (*gatewayv1.Namespace)(&grpcroute.Namespace)), + Port: backend.Port, + }, + }) + } + } + + return terror +} + +// listGRPCRoutesForGatewayProxy list all GRPCRoute resources that are affected by a given GatewayProxy +func (r *GRPCRouteReconciler) listGRPCRoutesForGatewayProxy(ctx context.Context, obj client.Object) []reconcile.Request { + gatewayProxy, ok := obj.(*v1alpha1.GatewayProxy) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to GatewayProxy") + return nil + } + + namespace := gatewayProxy.GetNamespace() + name := gatewayProxy.GetName() + + // find all gateways that reference this gateway proxy + gatewayList := &gatewayv1.GatewayList{} + if err := r.List(ctx, gatewayList, client.MatchingFields{ + indexer.ParametersRef: indexer.GenIndexKey(namespace, name), + }); err != nil { + r.Log.Error(err, "failed to list gateways for gateway proxy", "gatewayproxy", gatewayProxy.GetName()) + return nil + } + + var requests []reconcile.Request + + // for each gateway, find all GRPCRoute resources that reference it + for _, gateway := range gatewayList.Items { + grpcRouteList := &gatewayv1.GRPCRouteList{} + if err := r.List(ctx, grpcRouteList, client.MatchingFields{ + indexer.ParentRefs: indexer.GenIndexKey(gateway.Namespace, gateway.Name), + }); err != nil { + r.Log.Error(err, "failed to list grocroutes for gateway", "gateway", gateway.Name) Review Comment: Typo in log messages: 'grocroutes' should be 'grpcroutes'. ########## internal/controller/grpcroute_controller.go: ########## @@ -0,0 +1,584 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package controller + +import ( + "cmp" + "context" + "fmt" + + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + k8stypes "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" + gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" + "sigs.k8s.io/gateway-api/apis/v1beta1" + + "github.com/apache/apisix-ingress-controller/api/v1alpha1" + "github.com/apache/apisix-ingress-controller/internal/controller/indexer" + "github.com/apache/apisix-ingress-controller/internal/controller/status" + "github.com/apache/apisix-ingress-controller/internal/manager/readiness" + "github.com/apache/apisix-ingress-controller/internal/provider" + "github.com/apache/apisix-ingress-controller/internal/types" + "github.com/apache/apisix-ingress-controller/internal/utils" +) + +// GRPCRouteReconciler reconciles a GatewayClass object. +type GRPCRouteReconciler struct { //nolint:revive + client.Client + Scheme *runtime.Scheme + + Log logr.Logger + + Provider provider.Provider + + genericEvent chan event.GenericEvent + + Updater status.Updater + Readier readiness.ReadinessManager +} + +// SetupWithManager sets up the controller with the Manager. +func (r *GRPCRouteReconciler) SetupWithManager(mgr ctrl.Manager) error { + r.genericEvent = make(chan event.GenericEvent, 100) + + bdr := ctrl.NewControllerManagedBy(mgr). + For(&gatewayv1.GRPCRoute{}). + WithEventFilter(predicate.GenerationChangedPredicate{}). + Watches(&discoveryv1.EndpointSlice{}, + handler.EnqueueRequestsFromMapFunc(r.listGRPCRoutesByServiceBef), + ). + Watches(&v1alpha1.PluginConfig{}, + handler.EnqueueRequestsFromMapFunc(r.listGRPCRoutesByExtensionRef), + ). + Watches(&gatewayv1.Gateway{}, + handler.EnqueueRequestsFromMapFunc(r.listGRPCRoutesForGateway), + builder.WithPredicates( + predicate.Funcs{ + GenericFunc: func(e event.GenericEvent) bool { + return false + }, + DeleteFunc: func(e event.DeleteEvent) bool { + return false + }, + CreateFunc: func(e event.CreateEvent) bool { + return true + }, + UpdateFunc: func(e event.UpdateEvent) bool { + return true + }, + }, + ), + ). + Watches(&v1alpha1.BackendTrafficPolicy{}, + handler.EnqueueRequestsFromMapFunc(r.listGRPCRoutesForBackendTrafficPolicy), + builder.WithPredicates( + BackendTrafficPolicyPredicateFunc(r.genericEvent), + ), + ). + Watches(&v1alpha1.GatewayProxy{}, + handler.EnqueueRequestsFromMapFunc(r.listGRPCRoutesForGatewayProxy), + ). + WatchesRawSource( + source.Channel( + r.genericEvent, + handler.EnqueueRequestsFromMapFunc(r.listGRPCRouteForGenericEvent), + ), + ) + + if GetEnableReferenceGrant() { + bdr.Watches(&v1beta1.ReferenceGrant{}, + handler.EnqueueRequestsFromMapFunc(r.listGRPCRoutesForReferenceGrant), + builder.WithPredicates(referenceGrantPredicates(KindGRPCRoute)), + ) + } + + return bdr.Complete(r) +} + +func (r *GRPCRouteReconciler) listGRPCRoutesByExtensionRef(ctx context.Context, obj client.Object) []reconcile.Request { + pluginconfig, ok := obj.(*v1alpha1.PluginConfig) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to EndpointSlice") + return nil + } + namespace := pluginconfig.GetNamespace() + name := pluginconfig.GetName() + + grList := &gatewayv1.GRPCRouteList{} + if err := r.List(ctx, grList, client.MatchingFields{ + indexer.ExtensionRef: indexer.GenIndexKey(namespace, name), + }); err != nil { + r.Log.Error(err, "failed to list grpcroutes by extension reference", "extension", name) + return nil + } + requests := make([]reconcile.Request, 0, len(grList.Items)) + for _, gr := range grList.Items { + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: gr.Namespace, + Name: gr.Name, + }, + }) + } + return requests +} + +func (r *GRPCRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + defer r.Readier.Done(&gatewayv1.GRPCRoute{}, req.NamespacedName) + gr := new(gatewayv1.GRPCRoute) + if err := r.Get(ctx, req.NamespacedName, gr); err != nil { + if client.IgnoreNotFound(err) == nil { + gr.Namespace = req.Namespace + gr.Name = req.Name + + gr.TypeMeta = metav1.TypeMeta{ + Kind: KindGRPCRoute, + APIVersion: gatewayv1.GroupVersion.String(), + } + + if err := r.Provider.Delete(ctx, gr); err != nil { + r.Log.Error(err, "failed to delete grpcroute", "grpcroute", gr) + return ctrl.Result{}, err + } + return ctrl.Result{}, nil + } + return ctrl.Result{}, err + } + + type ResourceStatus struct { + status bool + msg string + } + + // Only keep acceptStatus since we're using error objects directly now + acceptStatus := ResourceStatus{ + status: true, + msg: "Route is accepted", + } + + gateways, err := ParseRouteParentRefs(ctx, r.Client, gr, gr.Spec.ParentRefs) + if err != nil { + return ctrl.Result{}, err + } + + if len(gateways) == 0 { + return ctrl.Result{}, nil + } + + tctx := provider.NewDefaultTranslateContext(ctx) + + tctx.RouteParentRefs = gr.Spec.ParentRefs + rk := utils.NamespacedNameKind(gr) + for _, gateway := range gateways { + if err := ProcessGatewayProxy(r.Client, r.Log, tctx, gateway.Gateway, rk); err != nil { + acceptStatus.status = false + acceptStatus.msg = err.Error() + } + if gateway.Listener != nil { + tctx.Listeners = append(tctx.Listeners, *gateway.Listener) + } + } + + var backendRefErr error + if err := r.processGRPCRoute(tctx, gr); err != nil { + // When encountering a backend reference error, it should not affect the acceptance status + if types.IsSomeReasonError(err, gatewayv1.RouteReasonInvalidKind) { + backendRefErr = err + } else { + acceptStatus.status = false + acceptStatus.msg = err.Error() + } + } + + // Store the backend reference error for later use. + // If the backend reference error is because of an invalid kind, use this error first + if err := r.processGRPCRouteBackendRefs(tctx, req.NamespacedName); err != nil && backendRefErr == nil { + backendRefErr = err + } + + ProcessBackendTrafficPolicy(r.Client, r.Log, tctx) + + // TODO: diff the old and new status + gr.Status.Parents = make([]gatewayv1.RouteParentStatus, 0, len(gateways)) + for _, gateway := range gateways { + parentStatus := gatewayv1.RouteParentStatus{} + SetRouteParentRef(&parentStatus, gateway.Gateway.Name, gateway.Gateway.Namespace) + for _, condition := range gateway.Conditions { + parentStatus.Conditions = MergeCondition(parentStatus.Conditions, condition) + } + SetRouteConditionAccepted(&parentStatus, gr.GetGeneration(), acceptStatus.status, acceptStatus.msg) + SetRouteConditionResolvedRefs(&parentStatus, gr.GetGeneration(), backendRefErr) + + gr.Status.Parents = append(gr.Status.Parents, parentStatus) + } + + r.Updater.Update(status.Update{ + NamespacedName: utils.NamespacedName(gr), + Resource: &gatewayv1.GRPCRoute{}, + Mutator: status.MutatorFunc(func(obj client.Object) client.Object { + h, ok := obj.(*gatewayv1.GRPCRoute) + if !ok { + err := fmt.Errorf("unsupported object type %T", obj) + panic(err) + } + hCopy := h.DeepCopy() + hCopy.Status = gr.Status + return hCopy + }), + }) + UpdateStatus(r.Updater, r.Log, tctx) + + if isRouteAccepted(gateways) && err == nil { + routeToUpdate := gr + if err := r.Provider.Update(ctx, tctx, routeToUpdate); err != nil { + return ctrl.Result{}, err + } + } + return ctrl.Result{}, nil +} + +func (r *GRPCRouteReconciler) listGRPCRoutesByServiceBef(ctx context.Context, obj client.Object) []reconcile.Request { + endpointSlice, ok := obj.(*discoveryv1.EndpointSlice) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to EndpointSlice") + return nil + } + namespace := endpointSlice.GetNamespace() + serviceName := endpointSlice.Labels[discoveryv1.LabelServiceName] + + gList := &gatewayv1.GRPCRouteList{} + if err := r.List(ctx, gList, client.MatchingFields{ + indexer.ServiceIndexRef: indexer.GenIndexKey(namespace, serviceName), + }); err != nil { + r.Log.Error(err, "failed to list grocroutes by service", "service", serviceName) + return nil + } + requests := make([]reconcile.Request, 0, len(gList.Items)) + for _, gr := range gList.Items { + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: gr.Namespace, + Name: gr.Name, + }, + }) + } + return requests +} + +func (r *GRPCRouteReconciler) listGRPCRoutesForBackendTrafficPolicy(ctx context.Context, obj client.Object) []reconcile.Request { + policy, ok := obj.(*v1alpha1.BackendTrafficPolicy) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to BackendTrafficPolicy") + return nil + } + + grpcRouteList := []gatewayv1.GRPCRoute{} + for _, targetRef := range policy.Spec.TargetRefs { + service := &corev1.Service{} + if err := r.Get(ctx, client.ObjectKey{ + Namespace: policy.Namespace, + Name: string(targetRef.Name), + }, service); err != nil { + if client.IgnoreNotFound(err) != nil { + r.Log.Error(err, "failed to get service", "namespace", policy.Namespace, "name", targetRef.Name) + } + continue + } + grList := &gatewayv1.GRPCRouteList{} + if err := r.List(ctx, grList, client.MatchingFields{ + indexer.ServiceIndexRef: indexer.GenIndexKey(policy.Namespace, string(targetRef.Name)), + }); err != nil { + r.Log.Error(err, "failed to list grocroutes by service reference", "service", targetRef.Name) + return nil + } + grpcRouteList = append(grpcRouteList, grList.Items...) + } + var namespacedNameMap = make(map[k8stypes.NamespacedName]struct{}) + requests := make([]reconcile.Request, 0, len(grpcRouteList)) + for _, gr := range grpcRouteList { + key := k8stypes.NamespacedName{ + Namespace: gr.Namespace, + Name: gr.Name, + } + if _, ok := namespacedNameMap[key]; !ok { + namespacedNameMap[key] = struct{}{} + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: gr.Namespace, + Name: gr.Name, + }, + }) + } + } + return requests +} + +func (r *GRPCRouteReconciler) listGRPCRoutesForGateway(ctx context.Context, obj client.Object) []reconcile.Request { + gateway, ok := obj.(*gatewayv1.Gateway) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to Gateway") + } + grList := &gatewayv1.GRPCRouteList{} + if err := r.List(ctx, grList, client.MatchingFields{ + indexer.ParentRefs: indexer.GenIndexKey(gateway.Namespace, gateway.Name), + }); err != nil { + r.Log.Error(err, "failed to list grocroutes by gateway", "gateway", gateway.Name) + return nil + } + requests := make([]reconcile.Request, 0, len(grList.Items)) + for _, gr := range grList.Items { + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: gr.Namespace, + Name: gr.Name, + }, + }) + } + return requests +} + +func (r *GRPCRouteReconciler) listGRPCRouteForGenericEvent(ctx context.Context, obj client.Object) (requests []reconcile.Request) { + switch obj.(type) { + case *v1alpha1.BackendTrafficPolicy: + return r.listGRPCRoutesForBackendTrafficPolicy(ctx, obj) + default: + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to BackendTrafficPolicy") + return nil + } +} + +func (r *GRPCRouteReconciler) processGRPCRouteBackendRefs(tctx *provider.TranslateContext, grNN k8stypes.NamespacedName) error { + var terr error + for _, backend := range tctx.BackendRefs { + targetNN := k8stypes.NamespacedName{ + Namespace: grNN.Namespace, + Name: string(backend.Name), + } + if backend.Namespace != nil { + targetNN.Namespace = string(*backend.Namespace) + } + + if backend.Kind != nil && *backend.Kind != "Service" { + terr = types.NewInvalidKindError(*backend.Kind) + continue + } + + if backend.Port == nil { + terr = fmt.Errorf("port is required") + continue + } + + var service corev1.Service + if err := r.Get(tctx, targetNN, &service); err != nil { + terr = err + if client.IgnoreNotFound(err) == nil { + terr = types.ReasonError{ + Reason: string(gatewayv1.RouteReasonBackendNotFound), + Message: fmt.Sprintf("Service %s not found", targetNN), + } + } + continue + } + + // if cross namespaces between GRPCRoute and referenced Service, check ReferenceGrant + if grNN.Namespace != targetNN.Namespace { + if permitted := checkReferenceGrant(tctx, + r.Client, + v1beta1.ReferenceGrantFrom{ + Group: gatewayv1.GroupName, + Kind: KindGRPCRoute, + Namespace: v1beta1.Namespace(grNN.Namespace), + }, + gatewayv1.ObjectReference{ + Group: corev1.GroupName, + Kind: KindService, + Name: gatewayv1.ObjectName(targetNN.Name), + Namespace: (*gatewayv1.Namespace)(&targetNN.Namespace), + }, + ); !permitted { + terr = types.ReasonError{ + Reason: string(v1beta1.RouteReasonRefNotPermitted), + Message: fmt.Sprintf("%s is in a different namespace than the GRPCRoute %s and no ReferenceGrant allowing reference is configured", targetNN, grNN), + } + continue + } + } + + if service.Spec.Type == corev1.ServiceTypeExternalName { + tctx.Services[targetNN] = &service + continue + } + + portExists := false + for _, port := range service.Spec.Ports { + if port.Port == int32(*backend.Port) { + portExists = true + break + } + } + if !portExists { + terr = fmt.Errorf("port %d not found in service %s", *backend.Port, targetNN.Name) + continue + } + tctx.Services[targetNN] = &service + + endpointSliceList := new(discoveryv1.EndpointSliceList) + if err := r.List(tctx, endpointSliceList, + client.InNamespace(targetNN.Namespace), + client.MatchingLabels{ + discoveryv1.LabelServiceName: targetNN.Name, + }, + ); err != nil { + r.Log.Error(err, "failed to list endpoint slices", "Service", targetNN) + terr = err + continue + } + + tctx.EndpointSlices[targetNN] = endpointSliceList.Items + } + return terr +} + +func (r *GRPCRouteReconciler) processGRPCRoute(tctx *provider.TranslateContext, grpcroute *gatewayv1.GRPCRoute) error { + var terror error + for _, rule := range grpcroute.Spec.Rules { + for _, filter := range rule.Filters { + if filter.Type != gatewayv1.GRPCRouteFilterExtensionRef || filter.ExtensionRef == nil { + continue + } + if filter.ExtensionRef.Kind == "PluginConfig" { + pluginconfig := new(v1alpha1.PluginConfig) + if err := r.Get(context.Background(), client.ObjectKey{ + Namespace: grpcroute.GetNamespace(), + Name: string(filter.ExtensionRef.Name), + }, pluginconfig); err != nil { + terror = err + continue + } + tctx.PluginConfigs[k8stypes.NamespacedName{ + Namespace: grpcroute.GetNamespace(), + Name: string(filter.ExtensionRef.Name), + }] = pluginconfig + } + } + for _, backend := range rule.BackendRefs { + if backend.Kind != nil && *backend.Kind != "Service" { + terror = types.NewInvalidKindError(*backend.Kind) + continue + } + tctx.BackendRefs = append(tctx.BackendRefs, gatewayv1.BackendRef{ + BackendObjectReference: gatewayv1.BackendObjectReference{ + Name: backend.Name, + Namespace: cmp.Or(backend.Namespace, (*gatewayv1.Namespace)(&grpcroute.Namespace)), + Port: backend.Port, + }, + }) + } + } + + return terror +} + +// listGRPCRoutesForGatewayProxy list all GRPCRoute resources that are affected by a given GatewayProxy +func (r *GRPCRouteReconciler) listGRPCRoutesForGatewayProxy(ctx context.Context, obj client.Object) []reconcile.Request { + gatewayProxy, ok := obj.(*v1alpha1.GatewayProxy) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to GatewayProxy") + return nil + } + + namespace := gatewayProxy.GetNamespace() + name := gatewayProxy.GetName() + + // find all gateways that reference this gateway proxy + gatewayList := &gatewayv1.GatewayList{} + if err := r.List(ctx, gatewayList, client.MatchingFields{ + indexer.ParametersRef: indexer.GenIndexKey(namespace, name), + }); err != nil { + r.Log.Error(err, "failed to list gateways for gateway proxy", "gatewayproxy", gatewayProxy.GetName()) + return nil + } + + var requests []reconcile.Request + + // for each gateway, find all GRPCRoute resources that reference it + for _, gateway := range gatewayList.Items { + grpcRouteList := &gatewayv1.GRPCRouteList{} + if err := r.List(ctx, grpcRouteList, client.MatchingFields{ + indexer.ParentRefs: indexer.GenIndexKey(gateway.Namespace, gateway.Name), + }); err != nil { + r.Log.Error(err, "failed to list grocroutes for gateway", "gateway", gateway.Name) + continue + } + + for _, grpcRoute := range grpcRouteList.Items { + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: grpcRoute.Namespace, + Name: grpcRoute.Name, + }, + }) + } + } + + return requests +} + +func (r *GRPCRouteReconciler) listGRPCRoutesForReferenceGrant(ctx context.Context, obj client.Object) (requests []reconcile.Request) { + grant, ok := obj.(*v1beta1.ReferenceGrant) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to ReferenceGrant") + return nil + } + + var grpcRouteList gatewayv1.GRPCRouteList + if err := r.List(ctx, &grpcRouteList); err != nil { + r.Log.Error(err, "failed to list grocroutes for reference ReferenceGrant", "ReferenceGrant", k8stypes.NamespacedName{Namespace: obj.GetNamespace(), Name: obj.GetName()}) Review Comment: Typo in log messages: 'grocroutes' should be 'grpcroutes'. ```suggestion r.Log.Error(err, "failed to list grpcroutes for reference ReferenceGrant", "ReferenceGrant", k8stypes.NamespacedName{Namespace: obj.GetNamespace(), Name: obj.GetName()}) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@apisix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org