This is an automated email from the ASF dual-hosted git repository. alinsran pushed a commit to branch v2.0.0 in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git
The following commit(s) were added to refs/heads/v2.0.0 by this push: new 9d7e0183 feat: add synchronization status to CRD (#2460) 9d7e0183 is described below commit 9d7e01831f5b1b1e97bdaa522ff8a5161ba19290 Author: AlinsRan <alins...@apache.org> AuthorDate: Mon Jul 7 14:37:01 2025 +0800 feat: add synchronization status to CRD (#2460) --- api/adc/types.go | 9 + internal/controller/apisixroute_controller.go | 37 +-- internal/controller/httproute_controller.go | 29 +-- internal/controller/utils.go | 34 +-- internal/controller/utils/utils.go | 71 ++++++ internal/manager/run.go | 2 +- internal/provider/adc/adc.go | 29 ++- internal/provider/adc/executor.go | 52 ++-- internal/provider/adc/status.go | 237 ++++++++++++++++++ internal/provider/adc/store.go | 93 ++++++- internal/types/error.go | 94 +++++++ internal/types/{types.go => k8s.go} | 23 +- internal/types/types.go | 11 + internal/utils/k8s.go | 8 + test/e2e/apisix/status.go | 342 ++++++++++++++++++++++++++ test/e2e/framework/manifests/ingress.yaml | 1 + test/e2e/scaffold/apisix_deployer.go | 6 + test/e2e/scaffold/deployer.go | 1 + 18 files changed, 980 insertions(+), 99 deletions(-) diff --git a/api/adc/types.go b/api/adc/types.go index d2ecba2d..0eb7d12a 100644 --- a/api/adc/types.go +++ b/api/adc/types.go @@ -77,6 +77,15 @@ const ( PassHostRewrite = "rewrite" ) +const ( + TypeRoute = "route" + TypeService = "service" + TypeConsumer = "consumer" + TypeSSL = "ssl" + TypeGlobalRule = "global_rule" + TypePluginMetadata = "plugin_metadata" +) + type Object interface { GetLabels() map[string]string } diff --git a/internal/controller/apisixroute_controller.go b/internal/controller/apisixroute_controller.go index bce27913..a07c6e76 100644 --- a/internal/controller/apisixroute_controller.go +++ b/internal/controller/apisixroute_controller.go @@ -32,7 +32,7 @@ import ( networkingv1 "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" + k8stypes "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" @@ -45,6 +45,7 @@ import ( "github.com/apache/apisix-ingress-controller/internal/controller/indexer" "github.com/apache/apisix-ingress-controller/internal/controller/status" "github.com/apache/apisix-ingress-controller/internal/provider" + "github.com/apache/apisix-ingress-controller/internal/types" "github.com/apache/apisix-ingress-controller/internal/utils" pkgutils "github.com/apache/apisix-ingress-controller/pkg/utils" ) @@ -134,7 +135,7 @@ func (r *ApisixRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, err } if err = r.Provider.Update(ctx, tctx, &ar); err != nil { - err = ReasonError{ + err = types.ReasonError{ Reason: string(apiv2.ConditionReasonSyncFailed), Message: err.Error(), } @@ -152,7 +153,7 @@ func (r *ApisixRouteReconciler) processApisixRoute(ctx context.Context, tc *prov for httpIndex, http := range in.Spec.HTTP { // check rule names if _, ok := rules[http.Name]; ok { - return ReasonError{ + return types.ReasonError{ Reason: string(apiv2.ConditionReasonInvalidSpec), Message: "duplicate route rule name", } @@ -178,7 +179,7 @@ func (r *ApisixRouteReconciler) processApisixRoute(ctx context.Context, tc *prov // check vars if _, err := http.Match.NginxVars.ToVars(); err != nil { - return ReasonError{ + return types.ReasonError{ Reason: string(apiv2.ConditionReasonInvalidSpec), Message: fmt.Sprintf(".spec.http[%d].match.exprs: %s", httpIndex, err.Error()), } @@ -186,7 +187,7 @@ func (r *ApisixRouteReconciler) processApisixRoute(ctx context.Context, tc *prov // validate remote address if err := utils.ValidateRemoteAddrs(http.Match.RemoteAddrs); err != nil { - return ReasonError{ + return types.ReasonError{ Reason: string(apiv2.ConditionReasonInvalidSpec), Message: fmt.Sprintf(".spec.http[%d].match.remoteAddrs: %s", httpIndex, err.Error()), } @@ -220,7 +221,7 @@ func (r *ApisixRouteReconciler) validatePluginConfig(ctx context.Context, tc *pr pcNN = utils.NamespacedName(&pc) ) if err := r.Get(ctx, pcNN, &pc); err != nil { - return ReasonError{ + return types.ReasonError{ Reason: string(apiv2.ConditionReasonInvalidSpec), Message: fmt.Sprintf("failed to get ApisixPluginConfig: %s", pcNN), } @@ -230,13 +231,13 @@ func (r *ApisixRouteReconciler) validatePluginConfig(ctx context.Context, tc *pr if in.Spec.IngressClassName != pc.Spec.IngressClassName && pc.Spec.IngressClassName != "" { var pcIC networkingv1.IngressClass if err := r.Get(ctx, client.ObjectKey{Name: pc.Spec.IngressClassName}, &pcIC); err != nil { - return ReasonError{ + return types.ReasonError{ Reason: string(apiv2.ConditionReasonInvalidSpec), Message: fmt.Sprintf("failed to get IngressClass %s for ApisixPluginConfig %s: %v", pc.Spec.IngressClassName, pcNN, err), } } if !matchesController(pcIC.Spec.Controller) { - return ReasonError{ + return types.ReasonError{ Reason: string(apiv2.ConditionReasonInvalidSpec), Message: fmt.Sprintf("ApisixPluginConfig %s references IngressClass %s with non-matching controller", pcNN, pc.Spec.IngressClassName), } @@ -271,7 +272,7 @@ func (r *ApisixRouteReconciler) validateSecrets(ctx context.Context, tc *provide secretNN = utils.NamespacedName(&secret) ) if err := r.Get(ctx, secretNN, &secret); err != nil { - return ReasonError{ + return types.ReasonError{ Reason: string(apiv2.ConditionReasonInvalidSpec), Message: fmt.Sprintf("failed to get Secret: %s", secretNN), } @@ -282,18 +283,18 @@ func (r *ApisixRouteReconciler) validateSecrets(ctx context.Context, tc *provide } func (r *ApisixRouteReconciler) validateBackends(ctx context.Context, tc *provider.TranslateContext, in *apiv2.ApisixRoute, http apiv2.ApisixRouteHTTP) error { - var backends = make(map[types.NamespacedName]struct{}) + var backends = make(map[k8stypes.NamespacedName]struct{}) for _, backend := range http.Backends { var ( au apiv2.ApisixUpstream service corev1.Service - serviceNN = types.NamespacedName{ + serviceNN = k8stypes.NamespacedName{ Namespace: in.GetNamespace(), Name: backend.ServiceName, } ) if _, ok := backends[serviceNN]; ok { - return ReasonError{ + return types.ReasonError{ Reason: string(apiv2.ConditionReasonInvalidSpec), Message: fmt.Sprintf("duplicate backend service: %s", serviceNN), } @@ -344,7 +345,7 @@ func (r *ApisixRouteReconciler) validateBackends(ctx context.Context, tc *provid discoveryv1.LabelServiceName: service.Name, }, ); err != nil { - return ReasonError{ + return types.ReasonError{ Reason: string(apiv2.ConditionReasonInvalidSpec), Message: fmt.Sprintf("failed to list endpoint slices: %v", err), } @@ -366,7 +367,7 @@ func (r *ApisixRouteReconciler) validateUpstreams(ctx context.Context, tc *provi } var ( ups apiv2.ApisixUpstream - upsNN = types.NamespacedName{ + upsNN = k8stypes.NamespacedName{ Namespace: ar.GetNamespace(), Name: upstream.Name, } @@ -384,7 +385,7 @@ func (r *ApisixRouteReconciler) validateUpstreams(ctx context.Context, tc *provi if node.Type == apiv2.ExternalTypeService { var ( service corev1.Service - serviceNN = types.NamespacedName{Namespace: ups.GetNamespace(), Name: node.Name} + serviceNN = k8stypes.NamespacedName{Namespace: ups.GetNamespace(), Name: node.Name} ) if err := r.Get(ctx, serviceNN, &service); err != nil { r.Log.Error(err, "failed to get service in ApisixUpstream", "ApisixUpstream", upsNN, "Service", serviceNN) @@ -400,7 +401,7 @@ func (r *ApisixRouteReconciler) validateUpstreams(ctx context.Context, tc *provi if ups.Spec.TLSSecret != nil && ups.Spec.TLSSecret.Name != "" { var ( secret corev1.Secret - secretNN = types.NamespacedName{Namespace: cmp.Or(ups.Spec.TLSSecret.Namespace, ar.GetNamespace()), Name: ups.Spec.TLSSecret.Name} + secretNN = k8stypes.NamespacedName{Namespace: cmp.Or(ups.Spec.TLSSecret.Namespace, ar.GetNamespace()), Name: ups.Spec.TLSSecret.Name} ) if err := r.Get(ctx, secretNN, &secret); err != nil { r.Log.Error(err, "failed to get secret in ApisixUpstream", "ApisixUpstream", upsNN, "Secret", secretNN) @@ -578,7 +579,7 @@ func (r *ApisixRouteReconciler) listApisixRoutesForPluginConfig(ctx context.Cont return pkgutils.DedupComparable(requests) } -func (r *ApisixRouteReconciler) getSubsetLabels(tctx *provider.TranslateContext, auNN types.NamespacedName, backend apiv2.ApisixRouteHTTPBackend) map[string]string { +func (r *ApisixRouteReconciler) getSubsetLabels(tctx *provider.TranslateContext, auNN k8stypes.NamespacedName, backend apiv2.ApisixRouteHTTPBackend) map[string]string { if backend.Subset == "" { return nil } @@ -621,7 +622,7 @@ func (r *ApisixRouteReconciler) filterEndpointSliceByTargetPod(ctx context.Conte var ( pod corev1.Pod - podNN = types.NamespacedName{ + podNN = k8stypes.NamespacedName{ Namespace: v.TargetRef.Namespace, Name: v.TargetRef.Name, } diff --git a/internal/controller/httproute_controller.go b/internal/controller/httproute_controller.go index 43d878fd..ae869761 100644 --- a/internal/controller/httproute_controller.go +++ b/internal/controller/httproute_controller.go @@ -31,7 +31,7 @@ import ( discoveryv1 "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" + k8stypes "k8s.io/apimachinery/pkg/types" "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" @@ -49,6 +49,7 @@ import ( "github.com/apache/apisix-ingress-controller/internal/controller/indexer" "github.com/apache/apisix-ingress-controller/internal/controller/status" "github.com/apache/apisix-ingress-controller/internal/provider" + "github.com/apache/apisix-ingress-controller/internal/types" "github.com/apache/apisix-ingress-controller/internal/utils" ) @@ -186,7 +187,7 @@ func (r *HTTPRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( var backendRefErr error if err := r.processHTTPRoute(tctx, hr); err != nil { // When encountering a backend reference error, it should not affect the acceptance status - if IsSomeReasonError(err, gatewayv1.RouteReasonInvalidKind) { + if types.IsSomeReasonError(err, gatewayv1.RouteReasonInvalidKind) { backendRefErr = err } else { acceptStatus.status = false @@ -340,10 +341,10 @@ func (r *HTTPRouteReconciler) listHTTPRoutesForBackendTrafficPolicy(ctx context. } httprouteList = append(httprouteList, hrList.Items...) } - var namespacedNameMap = make(map[types.NamespacedName]struct{}) + var namespacedNameMap = make(map[k8stypes.NamespacedName]struct{}) requests := make([]reconcile.Request, 0, len(httprouteList)) for _, hr := range httprouteList { - key := types.NamespacedName{ + key := k8stypes.NamespacedName{ Namespace: hr.Namespace, Name: hr.Name, } @@ -391,12 +392,12 @@ func (r *HTTPRouteReconciler) listHTTPRouteByHTTPRoutePolicy(ctx context.Context return nil } - var keys = make(map[types.NamespacedName]struct{}) + var keys = make(map[k8stypes.NamespacedName]struct{}) for _, ref := range httpRoutePolicy.Spec.TargetRefs { if ref.Kind != "HTTPRoute" { continue } - key := types.NamespacedName{ + key := k8stypes.NamespacedName{ Namespace: obj.GetNamespace(), Name: string(ref.Name), } @@ -441,10 +442,10 @@ func (r *HTTPRouteReconciler) listHTTPRouteForGenericEvent(ctx context.Context, } } -func (r *HTTPRouteReconciler) processHTTPRouteBackendRefs(tctx *provider.TranslateContext, hrNN types.NamespacedName) error { +func (r *HTTPRouteReconciler) processHTTPRouteBackendRefs(tctx *provider.TranslateContext, hrNN k8stypes.NamespacedName) error { var terr error for _, backend := range tctx.BackendRefs { - targetNN := types.NamespacedName{ + targetNN := k8stypes.NamespacedName{ Namespace: hrNN.Namespace, Name: string(backend.Name), } @@ -453,7 +454,7 @@ func (r *HTTPRouteReconciler) processHTTPRouteBackendRefs(tctx *provider.Transla } if backend.Kind != nil && *backend.Kind != "Service" { - terr = newInvalidKindError(*backend.Kind) + terr = types.NewInvalidKindError(*backend.Kind) continue } @@ -466,7 +467,7 @@ func (r *HTTPRouteReconciler) processHTTPRouteBackendRefs(tctx *provider.Transla if err := r.Get(tctx, targetNN, &service); err != nil { terr = err if client.IgnoreNotFound(err) == nil { - terr = ReasonError{ + terr = types.ReasonError{ Reason: string(gatewayv1.RouteReasonBackendNotFound), Message: fmt.Sprintf("Service %s not found", targetNN), } @@ -490,7 +491,7 @@ func (r *HTTPRouteReconciler) processHTTPRouteBackendRefs(tctx *provider.Transla Namespace: (*gatewayv1.Namespace)(&targetNN.Namespace), }, ); !permitted { - terr = ReasonError{ + terr = types.ReasonError{ Reason: string(v1beta1.RouteReasonRefNotPermitted), Message: fmt.Sprintf("%s is in a different namespace than the HTTPRoute %s and no ReferenceGrant allowing reference is configured", targetNN, hrNN), } @@ -549,7 +550,7 @@ func (r *HTTPRouteReconciler) processHTTPRoute(tctx *provider.TranslateContext, terror = err continue } - tctx.PluginConfigs[types.NamespacedName{ + tctx.PluginConfigs[k8stypes.NamespacedName{ Namespace: httpRoute.GetNamespace(), Name: string(filter.ExtensionRef.Name), }] = pluginconfig @@ -557,7 +558,7 @@ func (r *HTTPRouteReconciler) processHTTPRoute(tctx *provider.TranslateContext, } for _, backend := range rule.BackendRefs { if backend.Kind != nil && *backend.Kind != "Service" { - terror = newInvalidKindError(*backend.Kind) + terror = types.NewInvalidKindError(*backend.Kind) continue } tctx.BackendRefs = append(tctx.BackendRefs, gatewayv1.BackendRef{ @@ -659,7 +660,7 @@ func (r *HTTPRouteReconciler) listHTTPRoutesForReferenceGrant(ctx context.Contex var httpRouteList gatewayv1.HTTPRouteList if err := r.List(ctx, &httpRouteList); err != nil { - r.Log.Error(err, "failed to list httproutes for reference ReferenceGrant", "ReferenceGrant", types.NamespacedName{Namespace: obj.GetNamespace(), Name: obj.GetName()}) + r.Log.Error(err, "failed to list httproutes for reference ReferenceGrant", "ReferenceGrant", k8stypes.NamespacedName{Namespace: obj.GetNamespace(), Name: obj.GetName()}) return nil } diff --git a/internal/controller/utils.go b/internal/controller/utils.go index 45cbf12c..0465f952 100644 --- a/internal/controller/utils.go +++ b/internal/controller/utils.go @@ -287,7 +287,7 @@ func SetRouteConditionResolvedRefs(routeParentStatus *gatewayv1.RouteParentStatu condition.Status = metav1.ConditionFalse condition.Message = err.Error() - var re ReasonError + var re types.ReasonError if errors.As(err, &re) { condition.Reason = re.Reason } @@ -436,7 +436,7 @@ func SetApisixCRDConditionAccepted(status *apiv2.ApisixStatus, generation int64, condition.Reason = string(apiv2.ConditionReasonInvalidSpec) condition.Message = err.Error() - var re ReasonError + var re types.ReasonError if errors.As(err, &re) { condition.Reason = re.Reason } @@ -984,36 +984,6 @@ func FullTypeName(a any) string { return path.Join(path.Dir(pkgPath), name) } -type ReasonError struct { - Reason string - Message string -} - -func (e ReasonError) Error() string { - return e.Message -} - -func IsSomeReasonError[Reason ~string](err error, reasons ...Reason) bool { - if err == nil { - return false - } - var re ReasonError - if !errors.As(err, &re) { - return false - } - if len(reasons) == 0 { - return true - } - return slices.Contains(reasons, Reason(re.Reason)) -} - -func newInvalidKindError[Kind ~string](kind Kind) ReasonError { - return ReasonError{ - Reason: string(gatewayv1.RouteReasonInvalidKind), - Message: fmt.Sprintf("Invalid kind %s, only Service is supported", kind), - } -} - // filterHostnames accepts a list of gateways and an HTTPRoute, and returns a copy of the HTTPRoute with only the hostnames that match the listener hostnames of the gateways. // If the HTTPRoute hostnames do not intersect with the listener hostnames of the gateways, it returns an ErrNoMatchingListenerHostname error. func filterHostnames(gateways []RouteParentRefContext, httpRoute *gatewayv1.HTTPRoute) (*gatewayv1.HTTPRoute, error) { diff --git a/internal/controller/utils/utils.go b/internal/controller/utils/utils.go new file mode 100644 index 00000000..a22babc1 --- /dev/null +++ b/internal/controller/utils/utils.go @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package utils + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + apiv2 "github.com/apache/apisix-ingress-controller/api/v2" + "github.com/apache/apisix-ingress-controller/internal/utils" +) + +func SetApisixCRDConditionWithGeneration(status *apiv2.ApisixStatus, generation int64, condition metav1.Condition) { + condition.ObservedGeneration = generation + SetApisixCRDCondition(status, condition) +} + +func SetApisixCRDCondition(status *apiv2.ApisixStatus, condition metav1.Condition) { + for i, cond := range status.Conditions { + if cond.Type == condition.Type { + if cond.Status == condition.Status && + cond.ObservedGeneration > condition.ObservedGeneration { + return + } + status.Conditions[i] = condition + return + } + } + + status.Conditions = append(status.Conditions, condition) +} + +func NewConditionTypeAccepted(reason apiv2.ApisixRouteConditionReason, status bool, generation int64, msg string) metav1.Condition { + var condition = metav1.Condition{ + Type: string(apiv2.ConditionTypeAccepted), + Status: utils.ConditionStatus(status), + ObservedGeneration: generation, + LastTransitionTime: metav1.Now(), + Reason: string(reason), + Message: msg, + } + return condition +} + +func MergeCondition(conditions []metav1.Condition, newCondition metav1.Condition) []metav1.Condition { + if newCondition.LastTransitionTime.IsZero() { + newCondition.LastTransitionTime = metav1.Now() + } + newConditions := []metav1.Condition{} + for _, condition := range conditions { + if condition.Type != newCondition.Type { + newConditions = append(newConditions, condition) + } + } + newConditions = append(newConditions, newCondition) + return newConditions +} diff --git a/internal/manager/run.go b/internal/manager/run.go index bb34b9cb..54ee31c8 100644 --- a/internal/manager/run.go +++ b/internal/manager/run.go @@ -156,7 +156,7 @@ func Run(ctx context.Context, logger logr.Logger) error { return err } - provider, err := adc.New(&adc.Options{ + provider, err := adc.New(updater.Writer(), &adc.Options{ SyncTimeout: config.ControllerConfig.ExecADCTimeout.Duration, SyncPeriod: config.ControllerConfig.ProviderConfig.SyncPeriod.Duration, InitSyncDelay: config.ControllerConfig.ProviderConfig.InitSyncDelay.Duration, diff --git a/internal/provider/adc/adc.go b/internal/provider/adc/adc.go index 0ab02d44..d3eb3f47 100644 --- a/internal/provider/adc/adc.go +++ b/internal/provider/adc/adc.go @@ -37,6 +37,7 @@ import ( "github.com/apache/apisix-ingress-controller/api/v1alpha1" apiv2 "github.com/apache/apisix-ingress-controller/api/v2" "github.com/apache/apisix-ingress-controller/internal/controller/label" + "github.com/apache/apisix-ingress-controller/internal/controller/status" "github.com/apache/apisix-ingress-controller/internal/provider" "github.com/apache/apisix-ingress-controller/internal/provider/adc/translator" "github.com/apache/apisix-ingress-controller/internal/types" @@ -73,6 +74,9 @@ type adcClient struct { executor ADCExecutor Options + + updater status.Updater + statusUpdateMap map[types.NamespacedNameKind][]string } type Task struct { @@ -83,7 +87,7 @@ type Task struct { configs []adcConfig } -func New(opts ...Option) (provider.Provider, error) { +func New(updater status.Updater, opts ...Option) (provider.Provider, error) { o := Options{} o.ApplyOptions(opts) @@ -94,6 +98,7 @@ func New(opts ...Option) (provider.Provider, error) { parentRefs: make(map[types.NamespacedNameKind][]types.NamespacedNameKind), store: NewStore(), executor: &DefaultADCExecutor{}, + updater: updater, }, nil } @@ -318,6 +323,7 @@ func (d *adcClient) Sync(ctx context.Context) error { log.Debugw("syncing resources with multiple configs", zap.Any("configs", cfg)) + failedMap := map[string]types.ADCExecutionErrors{} var failedConfigs []string for name, config := range cfg { resources, err := d.store.GetResources(name) @@ -337,8 +343,13 @@ func (d *adcClient) Sync(ctx context.Context) error { }); err != nil { log.Errorw("failed to sync resources", zap.String("name", name), zap.Error(err)) failedConfigs = append(failedConfigs, name) + var execErrs types.ADCExecutionErrors + if errors.As(err, &execErrs) { + failedMap[name] = execErrs + } } } + d.handleADCExecutionErrors(failedMap) if len(failedConfigs) > 0 { return fmt.Errorf("failed to sync %d configs: %s", len(failedConfigs), @@ -363,15 +374,18 @@ func (d *adcClient) sync(ctx context.Context, task Task) error { args := BuildADCExecuteArgs(syncFilePath, task.Labels, task.ResourceTypes) - var failedConfigs []string + var errs types.ADCExecutionErrors for _, config := range task.configs { if err := d.executor.Execute(ctx, d.BackendMode, config, args); err != nil { log.Errorw("failed to execute adc command", zap.Error(err), zap.Any("config", config)) - failedConfigs = append(failedConfigs, config.Name) + var execErr types.ADCExecutionError + if errors.As(err, &execErr) { + errs.Errors = append(errs.Errors, execErr) + } } } - if len(failedConfigs) > 0 { - return fmt.Errorf("failed to execute adc command for configs: %s", strings.Join(failedConfigs, ", ")) + if len(errs.Errors) > 0 { + return errs } return nil } @@ -399,3 +413,8 @@ func prepareSyncFile(resources any) (string, func(), error) { return tmpFile.Name(), cleanup, nil } + +func (d *adcClient) handleADCExecutionErrors(statusesMap map[string]types.ADCExecutionErrors) { + statusUpdateMap := d.resolveADCExecutionErrors(statusesMap) + d.handleStatusUpdate(statusUpdateMap) +} diff --git a/internal/provider/adc/executor.go b/internal/provider/adc/executor.go index bd7b6c7f..857ef14d 100644 --- a/internal/provider/adc/executor.go +++ b/internal/provider/adc/executor.go @@ -33,6 +33,7 @@ import ( "go.uber.org/zap" adctypes "github.com/apache/apisix-ingress-controller/api/adc" + "github.com/apache/apisix-ingress-controller/internal/types" ) type ADCExecutor interface { @@ -51,15 +52,26 @@ func (e *DefaultADCExecutor) Execute(ctx context.Context, mode string, config ad } func (e *DefaultADCExecutor) runADC(ctx context.Context, mode string, config adcConfig, args []string) error { - var failedAddrs []string + var execErrs = types.ADCExecutionError{ + Name: config.Name, + } + for _, addr := range config.ServerAddrs { if err := e.runForSingleServerWithTimeout(ctx, addr, mode, config, args); err != nil { log.Errorw("failed to run adc for server", zap.String("server", addr), zap.Error(err)) - failedAddrs = append(failedAddrs, addr) + var execErr types.ADCExecutionServerAddrError + if errors.As(err, &execErr) { + execErrs.FailedErrors = append(execErrs.FailedErrors, execErr) + } else { + execErrs.FailedErrors = append(execErrs.FailedErrors, types.ADCExecutionServerAddrError{ + ServerAddr: addr, + Err: err.Error(), + }) + } } } - if len(failedAddrs) > 0 { - return fmt.Errorf("failed to run adc for servers: [%s]", strings.Join(failedAddrs, ", ")) + if len(execErrs.FailedErrors) > 0 { + return execErrs } return nil } @@ -95,7 +107,25 @@ func (e *DefaultADCExecutor) runForSingleServer(ctx context.Context, serverAddr, return e.buildCmdError(err, stdout.Bytes(), stderr.Bytes()) } - return e.handleOutput(stdout.Bytes()) + result, err := e.handleOutput(stdout.Bytes()) + if err != nil { + log.Errorw("failed to handle adc output", + zap.Error(err), + zap.String("stdout", stdout.String()), + zap.String("stderr", stderr.String()), + ) + return fmt.Errorf("failed to handle adc output: %w", err) + } + if result.FailedCount > 0 && len(result.Failed) > 0 { + log.Errorw("adc sync failed", zap.Any("result", result)) + return types.ADCExecutionServerAddrError{ + ServerAddr: serverAddr, + Err: result.Failed[0].Reason, + FailedStatuses: result.Failed, + } + } + log.Debugw("adc sync success", zap.Any("result", result)) + return nil } func (e *DefaultADCExecutor) prepareEnv(serverAddr, mode, token string) []string { @@ -121,7 +151,7 @@ func (e *DefaultADCExecutor) buildCmdError(runErr error, stdout, stderr []byte) return errors.New("failed to sync resources: " + errMsg + ", exit err: " + runErr.Error()) } -func (e *DefaultADCExecutor) handleOutput(output []byte) error { +func (e *DefaultADCExecutor) handleOutput(output []byte) (*adctypes.SyncResult, error) { var result adctypes.SyncResult log.Debugf("adc output: %s", string(output)) if lines := bytes.Split(output, []byte{'\n'}); len(lines) > 0 { @@ -132,16 +162,10 @@ func (e *DefaultADCExecutor) handleOutput(output []byte) error { zap.Error(err), zap.String("stdout", string(output)), ) - return errors.New("failed to parse adc result: " + err.Error()) - } - - if result.FailedCount > 0 && len(result.Failed) > 0 { - log.Errorw("adc sync failed", zap.Any("result", result)) - return errors.New(result.Failed[0].Reason) + return nil, errors.New("failed to parse adc result: " + err.Error()) } - log.Debugw("adc sync success", zap.Any("result", result)) - return nil + return &result, nil } func BuildADCExecuteArgs(filePath string, labels map[string]string, types []string) []string { diff --git a/internal/provider/adc/status.go b/internal/provider/adc/status.go new file mode 100644 index 00000000..8d1f70f8 --- /dev/null +++ b/internal/provider/adc/status.go @@ -0,0 +1,237 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package adc + +import ( + "fmt" + "strings" + + "github.com/api7/gopkg/pkg/log" + "go.uber.org/zap" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" + + apiv2 "github.com/apache/apisix-ingress-controller/api/v2" + "github.com/apache/apisix-ingress-controller/internal/controller/label" + "github.com/apache/apisix-ingress-controller/internal/controller/status" + cutils "github.com/apache/apisix-ingress-controller/internal/controller/utils" + "github.com/apache/apisix-ingress-controller/internal/types" +) + +// handleStatusUpdate updates resource conditions based on the latest sync results. +// +// It maintains a history of failed resources in d.statusUpdateMap. +// +// For resources in the current failure map (statusUpdateMap), it marks them as failed. +// For resources that exist only in the previous failure history (i.e. not in this sync's failures), +// it marks them as accepted (success). +func (d *adcClient) handleStatusUpdate(statusUpdateMap map[types.NamespacedNameKind][]string) { + // Mark all resources in the current failure set as failed. + for nnk, msgs := range statusUpdateMap { + d.updateStatus(nnk, cutils.NewConditionTypeAccepted( + apiv2.ConditionReasonSyncFailed, + false, + 0, + strings.Join(msgs, "; "), + )) + } + + // Mark resources that exist only in the previous failure history as successful. + for nnk := range d.statusUpdateMap { + if _, ok := statusUpdateMap[nnk]; !ok { + d.updateStatus(nnk, cutils.NewConditionTypeAccepted( + apiv2.ConditionReasonAccepted, + true, + 0, + "", + )) + } + } + // Update the failure history with the current failure set. + d.statusUpdateMap = statusUpdateMap +} + +func (d *adcClient) updateStatus(nnk types.NamespacedNameKind, condition metav1.Condition) { + switch nnk.Kind { + case types.KindApisixRoute: + d.updater.Update(status.Update{ + NamespacedName: nnk.NamespacedName(), + Resource: &apiv2.ApisixRoute{}, + Mutator: status.MutatorFunc(func(obj client.Object) client.Object { + cp := obj.(*apiv2.ApisixRoute).DeepCopy() + cutils.SetApisixCRDConditionWithGeneration(&cp.Status, cp.GetGeneration(), condition) + return cp + }), + }) + case types.KindApisixGlobalRule: + d.updater.Update(status.Update{ + NamespacedName: nnk.NamespacedName(), + Resource: &apiv2.ApisixGlobalRule{}, + Mutator: status.MutatorFunc(func(obj client.Object) client.Object { + cp := obj.(*apiv2.ApisixGlobalRule).DeepCopy() + cutils.SetApisixCRDConditionWithGeneration(&cp.Status, cp.GetGeneration(), condition) + return cp + }), + }) + case types.KindApisixTls: + d.updater.Update(status.Update{ + NamespacedName: nnk.NamespacedName(), + Resource: &apiv2.ApisixTls{}, + Mutator: status.MutatorFunc(func(obj client.Object) client.Object { + cp := obj.(*apiv2.ApisixTls).DeepCopy() + cutils.SetApisixCRDConditionWithGeneration(&cp.Status, cp.GetGeneration(), condition) + return cp + }), + }) + case types.KindApisixConsumer: + d.updater.Update(status.Update{ + NamespacedName: nnk.NamespacedName(), + Resource: &apiv2.ApisixConsumer{}, + Mutator: status.MutatorFunc(func(obj client.Object) client.Object { + cp := obj.(*apiv2.ApisixConsumer).DeepCopy() + cutils.SetApisixCRDConditionWithGeneration(&cp.Status, cp.GetGeneration(), condition) + return cp + }), + }) + case types.KindHTTPRoute: + parentRefs := d.getParentRefs(nnk) + gatewayRefs := map[types.NamespacedNameKind]struct{}{} + for _, parentRef := range parentRefs { + if parentRef.Kind == types.KindGateway { + gatewayRefs[parentRef] = struct{}{} + } + } + d.updater.Update(status.Update{ + NamespacedName: nnk.NamespacedName(), + Resource: &gatewayv1.HTTPRoute{}, + Mutator: status.MutatorFunc(func(obj client.Object) client.Object { + cp := obj.(*gatewayv1.HTTPRoute).DeepCopy() + gatewayNs := cp.GetNamespace() + for i, ref := range cp.Status.Parents { + ns := gatewayNs + if ref.ParentRef.Namespace != nil { + ns = string(*ref.ParentRef.Namespace) + } + if ref.ParentRef.Kind == nil || *ref.ParentRef.Kind == types.KindGateway { + nnk := types.NamespacedNameKind{ + Name: string(ref.ParentRef.Name), + Namespace: ns, + Kind: types.KindGateway, + } + if _, ok := gatewayRefs[nnk]; ok { + ref.Conditions = cutils.MergeCondition(ref.Conditions, condition) + cp.Status.Parents[i] = ref + } + } + } + return cp + }), + }) + } +} + +func (d *adcClient) resolveADCExecutionErrors( + statusesMap map[string]types.ADCExecutionErrors, +) map[types.NamespacedNameKind][]string { + statusUpdateMap := map[types.NamespacedNameKind][]string{} + for configName, execErrors := range statusesMap { + for _, execErr := range execErrors.Errors { + for _, failedStatus := range execErr.FailedErrors { + if len(failedStatus.FailedStatuses) == 0 { + d.handleEmptyFailedStatuses(configName, failedStatus, statusUpdateMap) + } else { + d.handleDetailedFailedStatuses(configName, failedStatus, statusUpdateMap) + } + } + } + } + + return statusUpdateMap +} + +func (d *adcClient) handleEmptyFailedStatuses( + configName string, + failedStatus types.ADCExecutionServerAddrError, + statusUpdateMap map[types.NamespacedNameKind][]string, +) { + resource, err := d.store.GetResources(configName) + if err != nil { + log.Errorw("failed to get resources from store", zap.String("configName", configName), zap.Error(err)) + return + } + + for _, obj := range resource.Services { + d.addResourceToStatusUpdateMap(obj.GetLabels(), failedStatus.Error(), statusUpdateMap) + } + + for _, obj := range resource.Consumers { + d.addResourceToStatusUpdateMap(obj.GetLabels(), failedStatus.Error(), statusUpdateMap) + } + + for _, obj := range resource.SSLs { + d.addResourceToStatusUpdateMap(obj.GetLabels(), failedStatus.Error(), statusUpdateMap) + } + + globalRules, err := d.store.ListGlobalRules(configName) + if err != nil { + log.Errorw("failed to list global rules", zap.String("configName", configName), zap.Error(err)) + return + } + for _, rule := range globalRules { + d.addResourceToStatusUpdateMap(rule.GetLabels(), failedStatus.Error(), statusUpdateMap) + } +} + +func (d *adcClient) handleDetailedFailedStatuses( + configName string, + failedStatus types.ADCExecutionServerAddrError, + statusUpdateMap map[types.NamespacedNameKind][]string, +) { + for _, status := range failedStatus.FailedStatuses { + id := status.Event.ResourceID + labels, err := d.store.GetResourceLabel(configName, status.Event.ResourceType, id) + if err != nil { + log.Errorw("failed to get resource label", + zap.String("configName", configName), + zap.String("resourceType", status.Event.ResourceType), + zap.String("id", id), + zap.Error(err), + ) + continue + } + d.addResourceToStatusUpdateMap( + labels, + fmt.Sprintf("ServerAddr: %s, Error: %s", failedStatus.ServerAddr, status.Reason), + statusUpdateMap, + ) + } +} + +func (d *adcClient) addResourceToStatusUpdateMap( + labels map[string]string, + msg string, + statusUpdateMap map[types.NamespacedNameKind][]string, +) { + statusKey := types.NamespacedNameKind{ + Name: labels[label.LabelName], + Namespace: labels[label.LabelNamespace], + Kind: labels[label.LabelKind], + } + statusUpdateMap[statusKey] = append(statusUpdateMap[statusKey], msg) +} diff --git a/internal/provider/adc/store.go b/internal/provider/adc/store.go index 62c554dc..d5626c93 100644 --- a/internal/provider/adc/store.go +++ b/internal/provider/adc/store.go @@ -18,6 +18,7 @@ package adc import ( + "fmt" "sync" "github.com/api7/gopkg/pkg/log" @@ -63,7 +64,7 @@ func (s *Store) Insert(name string, resourceTypes []string, resources adctypes.R } for _, resourceType := range resourceTypes { switch resourceType { - case "service": + case adctypes.TypeService: services, err := targetCache.ListServices(selector) if err != nil { return err @@ -78,7 +79,7 @@ func (s *Store) Insert(name string, resourceTypes []string, resources adctypes.R return err } } - case "consumer": + case adctypes.TypeConsumer: consumers, err := targetCache.ListConsumers(selector) if err != nil { return err @@ -93,7 +94,7 @@ func (s *Store) Insert(name string, resourceTypes []string, resources adctypes.R return err } } - case "ssl": + case adctypes.TypeSSL: ssls, err := targetCache.ListSSL(selector) if err != nil { return err @@ -109,7 +110,7 @@ func (s *Store) Insert(name string, resourceTypes []string, resources adctypes.R return err } } - case "global_rule": + case adctypes.TypeGlobalRule: // List existing global rules that match the selector globalRules, err := targetCache.ListGlobalRules(selector) if err != nil { @@ -136,7 +137,7 @@ func (s *Store) Insert(name string, resourceTypes []string, resources adctypes.R return err } } - case "plugin_metadata": + case adctypes.TypePluginMetadata: s.pluginMetadataMap[name] = resources.PluginMetadata default: continue @@ -159,7 +160,7 @@ func (s *Store) Delete(name string, resourceTypes []string, Labels map[string]st } for _, resourceType := range resourceTypes { switch resourceType { - case "service": + case adctypes.TypeService: services, err := targetCache.ListServices(selector) if err != nil { log.Errorf("failed to list services: %v", err) @@ -169,7 +170,7 @@ func (s *Store) Delete(name string, resourceTypes []string, Labels map[string]st log.Errorf("failed to delete service %s: %v", service.ID, err) } } - case "ssl": + case adctypes.TypeSSL: ssls, err := targetCache.ListSSL(selector) if err != nil { log.Errorf("failed to list ssl: %v", err) @@ -179,7 +180,7 @@ func (s *Store) Delete(name string, resourceTypes []string, Labels map[string]st log.Errorf("failed to delete ssl %s: %v", ssl.ID, err) } } - case "consumer": + case adctypes.TypeConsumer: consumers, err := targetCache.ListConsumers(selector) if err != nil { log.Errorf("failed to list consumers: %v", err) @@ -189,7 +190,7 @@ func (s *Store) Delete(name string, resourceTypes []string, Labels map[string]st log.Errorf("failed to delete consumer %s: %v", consumer.Username, err) } } - case "global_rule": + case adctypes.TypeGlobalRule: globalRules, err := targetCache.ListGlobalRules(selector) if err != nil { log.Errorf("failed to list global rules: %v", err) @@ -199,7 +200,7 @@ func (s *Store) Delete(name string, resourceTypes []string, Labels map[string]st log.Errorf("failed to delete global rule %s: %v", globalRule.ID, err) } } - case "plugin_metadata": + case adctypes.TypePluginMetadata: delete(s.pluginMetadataMap, name) } } @@ -244,3 +245,75 @@ func (s *Store) GetResources(name string) (*adctypes.Resources, error) { PluginMetadata: metadata, }, nil } + +func (s *Store) ListGlobalRules(name string) ([]*adctypes.GlobalRuleItem, error) { + s.Lock() + defer s.Unlock() + targetCache, ok := s.cacheMap[name] + if !ok { + return nil, fmt.Errorf("cache not found for name: %s", name) + } + globalRules, err := targetCache.ListGlobalRules() + if err != nil { + return nil, fmt.Errorf("failed to list global rules: %w", err) + } + return globalRules, nil +} + +func (s *Store) GetResourceLabel(name, resourceType string, id string) (map[string]string, error) { + s.Lock() + defer s.Unlock() + targetCache, ok := s.cacheMap[name] + if !ok { + return nil, fmt.Errorf("cache not found for name: %s", name) + } + switch resourceType { + case adctypes.TypeService: + service, err := targetCache.GetService(id) + if err != nil { + return nil, fmt.Errorf("failed to get service: %w", err) + } + return service.Labels, nil + case adctypes.TypeRoute: + services, err := targetCache.ListServices() + if err != nil { + return nil, fmt.Errorf("failed to list services: %w", err) + } + for _, service := range services { + for _, route := range service.Routes { + if route.ID == id { + // Return labels from the service that contains the route + return route.GetLabels(), nil + } + } + } + return nil, fmt.Errorf("route not found: %s", id) + case adctypes.TypeSSL: + ssl, err := targetCache.GetSSL(id) + if err != nil { + return nil, err + } + if ssl != nil { + return ssl.GetLabels(), nil + } + case adctypes.TypeConsumer: + consumer, err := targetCache.GetConsumer(id) + if err != nil { + return nil, err + } + if consumer != nil { + return consumer.Labels, nil + } + case adctypes.TypeGlobalRule: + globalRule, err := targetCache.GetGlobalRule(id) + if err != nil { + return nil, err + } + if globalRule != nil { + return globalRule.GetLabels(), nil + } + default: + return nil, fmt.Errorf("unknown resource type: %s", resourceType) + } + return nil, nil +} diff --git a/internal/types/error.go b/internal/types/error.go new file mode 100644 index 00000000..80dbf568 --- /dev/null +++ b/internal/types/error.go @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package types + +import ( + "errors" + "fmt" + "slices" + "strings" + + gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" + + "github.com/apache/apisix-ingress-controller/api/adc" +) + +type ReasonError struct { + Reason string + Message string +} + +func (e ReasonError) Error() string { + return e.Message +} + +func IsSomeReasonError[Reason ~string](err error, reasons ...Reason) bool { + if err == nil { + return false + } + var re ReasonError + if !errors.As(err, &re) { + return false + } + if len(reasons) == 0 { + return true + } + return slices.Contains(reasons, Reason(re.Reason)) +} + +func NewInvalidKindError[Kind ~string](kind Kind) ReasonError { + return ReasonError{ + Reason: string(gatewayv1.RouteReasonInvalidKind), + Message: fmt.Sprintf("Invalid kind %s, only Service is supported", kind), + } +} + +type ADCExecutionErrors struct { + Errors []ADCExecutionError +} + +func (e ADCExecutionErrors) Error() string { + messages := make([]string, 0, len(e.Errors)) + for _, err := range e.Errors { + messages = append(messages, err.Error()) + } + return fmt.Sprintf("ADC execution errors: [%s]", strings.Join(messages, "; ")) +} + +type ADCExecutionError struct { + Name string + FailedErrors []ADCExecutionServerAddrError +} + +func (e ADCExecutionError) Error() string { + messages := make([]string, 0, len(e.FailedErrors)) + for _, failed := range e.FailedErrors { + messages = append(messages, failed.Error()) + } + return fmt.Sprintf("ADC execution error for %s: [%s]", e.Name, strings.Join(messages, "; ")) +} + +type ADCExecutionServerAddrError struct { + Err string + ServerAddr string + FailedStatuses []adc.SyncStatus +} + +func (e ADCExecutionServerAddrError) Error() string { + return fmt.Sprintf("ServerAddr: %s, Err: %s", e.ServerAddr, e.Err) +} diff --git a/internal/types/types.go b/internal/types/k8s.go similarity index 55% copy from internal/types/types.go copy to internal/types/k8s.go index d501db75..4bdf1e93 100644 --- a/internal/types/types.go +++ b/internal/types/k8s.go @@ -17,8 +17,21 @@ package types -type NamespacedNameKind struct { - Namespace string - Name string - Kind string -} +const DefaultIngressClassAnnotation = "ingressclass.kubernetes.io/is-default-class" + +const ( + KindGateway = "Gateway" + KindHTTPRoute = "HTTPRoute" + KindGatewayClass = "GatewayClass" + KindIngress = "Ingress" + KindIngressClass = "IngressClass" + KindGatewayProxy = "GatewayProxy" + KindSecret = "Secret" + KindService = "Service" + KindApisixRoute = "ApisixRoute" + KindApisixGlobalRule = "ApisixGlobalRule" + KindApisixPluginConfig = "ApisixPluginConfig" + KindPod = "Pod" + KindApisixTls = "ApisixTls" + KindApisixConsumer = "ApisixConsumer" +) diff --git a/internal/types/types.go b/internal/types/types.go index d501db75..f695da87 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -17,8 +17,19 @@ package types +import ( + k8stypes "k8s.io/apimachinery/pkg/types" +) + type NamespacedNameKind struct { Namespace string Name string Kind string } + +func (n *NamespacedNameKind) NamespacedName() k8stypes.NamespacedName { + return k8stypes.NamespacedName{ + Namespace: n.Namespace, + Name: n.Name, + } +} diff --git a/internal/utils/k8s.go b/internal/utils/k8s.go index 023831f1..0bca19d0 100644 --- a/internal/utils/k8s.go +++ b/internal/utils/k8s.go @@ -21,6 +21,7 @@ import ( "net" "regexp" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8stypes "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" @@ -91,3 +92,10 @@ func IsSubsetOf(a, b map[string]string) bool { } return true } + +func ConditionStatus(status bool) metav1.ConditionStatus { + if status { + return metav1.ConditionTrue + } + return metav1.ConditionFalse +} diff --git a/test/e2e/apisix/status.go b/test/e2e/apisix/status.go new file mode 100644 index 00000000..1c72e412 --- /dev/null +++ b/test/e2e/apisix/status.go @@ -0,0 +1,342 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package apisix + +import ( + "fmt" + "os" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "k8s.io/apimachinery/pkg/types" + + apiv2 "github.com/apache/apisix-ingress-controller/api/v2" + "github.com/apache/apisix-ingress-controller/test/e2e/framework" + "github.com/apache/apisix-ingress-controller/test/e2e/scaffold" +) + +var _ = Describe("Test CRD Status", Label("apisix.apache.org", "v2", "apisixroute"), func() { + var ( + s = scaffold.NewScaffold(&scaffold.Options{ + ControllerName: "apisix.apache.org/apisix-ingress-controller", + }) + applier = framework.NewApplier(s.GinkgoT, s.K8sClient, s.CreateResourceFromString) + ) + + assertion := func(actualOrCtx any, args ...any) AsyncAssertion { + return Eventually(actualOrCtx).WithArguments(args...).WithTimeout(30 * time.Second).ProbeEvery(time.Second) + } + + Context("Test ApisixRoute Sync Status", func() { + BeforeEach(func() { + By("create GatewayProxy") + gatewayProxy := fmt.Sprintf(gatewayProxyYaml, s.Deployer.GetAdminEndpoint(), s.AdminKey()) + err := s.CreateResourceFromStringWithNamespace(gatewayProxy, "default") + Expect(err).NotTo(HaveOccurred(), "creating GatewayProxy") + time.Sleep(5 * time.Second) + + By("create IngressClass") + err = s.CreateResourceFromStringWithNamespace(ingressClassYaml, "") + Expect(err).NotTo(HaveOccurred(), "creating IngressClass") + time.Sleep(5 * time.Second) + }) + const ar = ` +apiVersion: apisix.apache.org/v2 +kind: ApisixRoute +metadata: + name: default +spec: + ingressClassName: apisix + http: + - name: rule0 + match: + hosts: + - httpbin + paths: + - /* + backends: + - serviceName: httpbin-service-e2e-test + servicePort: 80 +` + const arWithInvalidPlugin = ` +apiVersion: apisix.apache.org/v2 +kind: ApisixRoute +metadata: + name: default +spec: + ingressClassName: apisix + http: + - name: rule0 + match: + hosts: + - httpbin + paths: + - /* + backends: + - serviceName: httpbin-service-e2e-test + servicePort: 80 + plugins: + - name: non-existent-plugin + enable: true +` + + getRequest := func(path string) func() int { + return func() int { + return s.NewAPISIXClient().GET(path).WithHost("httpbin").Expect().Raw().StatusCode + } + } + + It("unknown plugin", func() { + if os.Getenv("PROVIDER_TYPE") == "apisix-standalone" { + Skip("apisix standalone does not validate unknown plugins") + } + By("apply ApisixRoute with valid plugin") + applier.MustApplyAPIv2(types.NamespacedName{Namespace: s.Namespace(), Name: "default"}, &apiv2.ApisixRoute{}, arWithInvalidPlugin) + + By("check ApisixRoute status") + assertion(func() string { + output, _ := s.GetOutputFromString("ar", "default", "-o", "yaml") + return output + }).Should( + And( + ContainSubstring(`status: "False"`), + ContainSubstring(`reason: SyncFailed`), + ContainSubstring(`unknown plugin [non-existent-plugin]`), + ), + ) + + By("Update ApisixRoute") + applier.MustApplyAPIv2(types.NamespacedName{Namespace: s.Namespace(), Name: "default"}, &apiv2.ApisixRoute{}, ar) + + By("check ApisixRoute status") + assertion(func() string { + output, _ := s.GetOutputFromString("ar", "default", "-o", "yaml") + return output + }).Should( + And( + ContainSubstring(`status: "True"`), + ContainSubstring(`reason: Accepted`), + ), + ) + + By("check route in APISIX") + assertion(getRequest("/get")).Should(Equal(200), "should be able to access the route") + }) + + It("dataplane unavailable", func() { + By("apply ApisixRoute") + applier.MustApplyAPIv2(types.NamespacedName{Namespace: s.Namespace(), Name: "default"}, &apiv2.ApisixRoute{}, ar) + + By("check ApisixRoute status") + assertion(func() string { + output, _ := s.GetOutputFromString("ar", "default", "-o", "yaml") + return output + }).Should( + And( + ContainSubstring(`status: "True"`), + ContainSubstring(`reason: Accepted`), + ), + ) + + By("check route in APISIX") + assertion(getRequest("/get")).Should(Equal(200), "should be able to access the route") + + s.Deployer.ScaleDataplane(0) + + By("check ApisixRoute status") + assertion(func() string { + output, _ := s.GetOutputFromString("ar", "default", "-o", "yaml") + return output + }).Should( + And( + ContainSubstring(`status: "False"`), + ContainSubstring(`reason: SyncFailed`), + ), + ) + + s.Deployer.ScaleDataplane(1) + + By("check ApisixRoute status after scaling up") + assertion(func() string { + output, _ := s.GetOutputFromString("ar", "default", "-o", "yaml") + return output + }).Should( + And( + ContainSubstring(`status: "True"`), + ContainSubstring(`reason: Accepted`), + ), + ) + + By("check route in APISIX") + assertion(getRequest("/get")).Should(Equal(200), "should be able to access the route") + }) + }) + + Context("Test HTTPRoute Sync Status", func() { + const httproute = ` +apiVersion: gateway.networking.k8s.io/v1 +kind: HTTPRoute +metadata: + name: httpbin +spec: + parentRefs: + - name: apisix + hostnames: + - "httpbin" + rules: + - matches: + - path: + type: Exact + value: /get + backendRefs: + - name: httpbin-service-e2e-test + port: 80 +` + const gatewayClass = ` +apiVersion: gateway.networking.k8s.io/v1 +kind: GatewayClass +metadata: + name: %s +spec: + controllerName: %s +` + const gatewayProxy = ` +apiVersion: apisix.apache.org/v1alpha1 +kind: GatewayProxy +metadata: + name: apisix-proxy-config +spec: + provider: + type: ControlPlane + controlPlane: + endpoints: + - %s + auth: + type: AdminKey + adminKey: + value: "%s" +` + const defaultGateway = ` +apiVersion: gateway.networking.k8s.io/v1 +kind: Gateway +metadata: + name: apisix +spec: + gatewayClassName: %s + listeners: + - name: http1 + protocol: HTTP + port: 80 + infrastructure: + parametersRef: + group: apisix.apache.org + kind: GatewayProxy + name: apisix-proxy-config +` + BeforeEach(func() { + By("create GatewayProxy") + gatewayProxy := fmt.Sprintf(gatewayProxy, s.Deployer.GetAdminEndpoint(), s.AdminKey()) + err := s.CreateResourceFromString(gatewayProxy) + Expect(err).NotTo(HaveOccurred(), "creating GatewayProxy") + time.Sleep(5 * time.Second) + + By("create GatewayClass") + gatewayClassName := fmt.Sprintf("apisix-%d", time.Now().Unix()) + err = s.CreateResourceFromStringWithNamespace(fmt.Sprintf(gatewayClass, gatewayClassName, s.GetControllerName()), "") + Expect(err).NotTo(HaveOccurred(), "creating GatewayClass") + time.Sleep(5 * time.Second) + + By("create Gateway") + err = s.CreateResourceFromString(fmt.Sprintf(defaultGateway, gatewayClassName)) + Expect(err).NotTo(HaveOccurred(), "creating Gateway") + time.Sleep(5 * time.Second) + + By("check Gateway condition") + gwyaml, err := s.GetResourceYaml("Gateway", "apisix") + Expect(err).NotTo(HaveOccurred(), "getting Gateway yaml") + Expect(gwyaml).To(ContainSubstring(`status: "True"`), "checking Gateway condition status") + Expect(gwyaml).To(ContainSubstring("message: the gateway has been accepted by the apisix-ingress-controller"), "checking Gateway condition message") + }) + AfterEach(func() { + _ = s.DeleteResource("Gateway", "apisix") + }) + getRequest := func(path string) func() int { + return func() int { + return s.NewAPISIXClient().GET(path).WithHost("httpbin").Expect().Raw().StatusCode + } + } + var resourceApplied = func(resourType, resourceName, resourceRaw string, observedGeneration int) { + Expect(s.CreateResourceFromString(resourceRaw)). + NotTo(HaveOccurred(), fmt.Sprintf("creating %s", resourType)) + + Eventually(func() string { + hryaml, err := s.GetResourceYaml(resourType, resourceName) + Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("getting %s yaml", resourType)) + return hryaml + }, "8s", "2s"). + Should( + SatisfyAll( + ContainSubstring(`status: "True"`), + ContainSubstring(fmt.Sprintf("observedGeneration: %d", observedGeneration)), + ), + fmt.Sprintf("checking %s condition status", resourType), + ) + time.Sleep(5 * time.Second) + } + + It("dataplane unavailable", func() { + By("Create HTTPRoute") + resourceApplied("HTTPRoute", "httpbin", httproute, 1) + + By("check route in APISIX") + assertion(getRequest("/get")).Should(Equal(200), "should be able to access the route") + + s.Deployer.ScaleDataplane(0) + time.Sleep(10 * time.Second) + + By("check ApisixRoute status") + assertion(func() string { + output, _ := s.GetOutputFromString("httproute", "httpbin", "-o", "yaml") + return output + }).Should( + And( + ContainSubstring(`status: "False"`), + ContainSubstring(`reason: SyncFailed`), + ), + ) + + s.Deployer.ScaleDataplane(1) + time.Sleep(10 * time.Second) + + By("check ApisixRoute status after scaling up") + assertion(func() string { + output, _ := s.GetOutputFromString("httproute", "httpbin", "-o", "yaml") + return output + }).Should( + And( + ContainSubstring(`status: "True"`), + ContainSubstring(`reason: Accepted`), + ), + ) + + By("check route in APISIX") + assertion(getRequest("/get")).Should(Equal(200), "should be able to access the route") + }) + }) +}) diff --git a/test/e2e/framework/manifests/ingress.yaml b/test/e2e/framework/manifests/ingress.yaml index 1ed00c10..c4bb1014 100644 --- a/test/e2e/framework/manifests/ingress.yaml +++ b/test/e2e/framework/manifests/ingress.yaml @@ -327,6 +327,7 @@ data: controller_name: {{ .ControllerName | default "apisix.apache.org/apisix-ingress-controller" }} leader_election_id: "apisix-ingress-controller-leader" + provider: type: {{ .ProviderType | default "apisix" }} sync_period: {{ .ProviderSyncPeriod | default "0s" }} diff --git a/test/e2e/scaffold/apisix_deployer.go b/test/e2e/scaffold/apisix_deployer.go index 67f20a86..b6fe7feb 100644 --- a/test/e2e/scaffold/apisix_deployer.go +++ b/test/e2e/scaffold/apisix_deployer.go @@ -250,6 +250,12 @@ func (s *APISIXDeployer) deployDataplane(opts *APISIXDeployOptions) *corev1.Serv return svc } +func (s *APISIXDeployer) ScaleDataplane(replicas int) { + s.DeployDataplane(DeployDataplaneOptions{ + Replicas: ptr.To(replicas), + }) +} + func (s *APISIXDeployer) DeployIngress() { s.Framework.DeployIngress(framework.IngressDeployOpts{ ControllerName: s.opts.ControllerName, diff --git a/test/e2e/scaffold/deployer.go b/test/e2e/scaffold/deployer.go index cfbe4043..94d740cd 100644 --- a/test/e2e/scaffold/deployer.go +++ b/test/e2e/scaffold/deployer.go @@ -26,6 +26,7 @@ type Deployer interface { DeployDataplane(opts DeployDataplaneOptions) DeployIngress() ScaleIngress(replicas int) + ScaleDataplane(replicas int) BeforeEach() AfterEach() CreateAdditionalGateway(namePrefix string) (string, *corev1.Service, error)