This is an automated email from the ASF dual-hosted git repository.

jimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-seata-k8s.git


The following commit(s) were added to refs/heads/master by this push:
     new e079981  feat: enhance SeataServer status with categorized error 
tracking (#26)
e079981 is described below

commit e079981215a131e95d81b18a263271ca49e98a29
Author: xinfan.wu <13708123...@163.com>
AuthorDate: Mon Mar 10 10:38:07 2025 +0800

    feat: enhance SeataServer status with categorized error tracking (#26)
---
 api/v1alpha1/seataserver_types.go                  |  28 +++++-
 .../operator.seata.apache.org_seataservers.yaml    |  17 ++++
 controllers/seataserver_controller.go              | 112 +++++++++++++++------
 3 files changed, 121 insertions(+), 36 deletions(-)

diff --git a/api/v1alpha1/seataserver_types.go 
b/api/v1alpha1/seataserver_types.go
index 6fc2ccf..50d2a92 100644
--- a/api/v1alpha1/seataserver_types.go
+++ b/api/v1alpha1/seataserver_types.go
@@ -76,11 +76,33 @@ func (s *SeataServerSpec) withDefaults() (changed bool) {
        return changed
 }
 
+type ServerErrorType string
+
+const (
+       ErrorTypeK8s_SeataServer     ServerErrorType = "k8s-seata-server"
+       ErrorTypeK8s_HeadlessService ServerErrorType = "k8s-headless-service"
+       ErrorTypeK8s_Pvc             ServerErrorType = "k8s-pvc"
+       ErrorTypeK8s_StatefulSet     ServerErrorType = "k8s-statefulset"
+       ErrorTypeRuntime             ServerErrorType = "runtime"
+)
+
+func (e ServerErrorType) String() string {
+       return string(e)
+}
+
+// SeataServerError defines the error of SeataServer
+type SeataServerError struct {
+       Type      string      `json:"type"`
+       Message   string      `json:"message"`
+       Timestamp metav1.Time `json:"timestamp"`
+}
+
 // SeataServerStatus defines the observed state of SeataServer
 type SeataServerStatus struct {
-       Synchronized  bool  `json:"synchronized"`
-       Replicas      int32 `json:"replicas"`
-       ReadyReplicas int32 `json:"readyReplicas,omitempty"`
+       Synchronized  bool               `json:"synchronized"`
+       Replicas      int32              `json:"replicas"`
+       ReadyReplicas int32              `json:"readyReplicas,omitempty"`
+       Errors        []SeataServerError `json:"errors,omitempty"`
 }
 
 //+kubebuilder:object:root=true
diff --git a/config/crd/bases/operator.seata.apache.org_seataservers.yaml 
b/config/crd/bases/operator.seata.apache.org_seataservers.yaml
index 92e51dc..a43d693 100644
--- a/config/crd/bases/operator.seata.apache.org_seataservers.yaml
+++ b/config/crd/bases/operator.seata.apache.org_seataservers.yaml
@@ -461,6 +461,23 @@ spec:
           status:
             description: SeataServerStatus defines the observed state of 
SeataServer
             properties:
+              errors:
+                items:
+                  description: SeataServerError defines the error of 
SeataServer
+                  properties:
+                    message:
+                      type: string
+                    timestamp:
+                      format: date-time
+                      type: string
+                    type:
+                      type: string
+                  required:
+                  - message
+                  - timestamp
+                  - type
+                  type: object
+                type: array
               readyReplicas:
                 format: int32
                 type: integer
diff --git a/controllers/seataserver_controller.go 
b/controllers/seataserver_controller.go
index 0e83928..0e02335 100644
--- a/controllers/seataserver_controller.go
+++ b/controllers/seataserver_controller.go
@@ -20,34 +20,40 @@ package controllers
 import (
        "context"
        "fmt"
-       "github.com/apache/seata-k8s/pkg/seata"
-       "github.com/apache/seata-k8s/pkg/utils"
+       "time"
+
+       "github.com/go-logr/logr"
        appsv1 "k8s.io/api/apps/v1"
        apiv1 "k8s.io/api/core/v1"
        "k8s.io/apimachinery/pkg/api/errors"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
        "k8s.io/apimachinery/pkg/runtime"
        "k8s.io/apimachinery/pkg/types"
+       "k8s.io/client-go/util/retry"
        ctrl "sigs.k8s.io/controller-runtime"
        "sigs.k8s.io/controller-runtime/pkg/client"
        "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
-       "sigs.k8s.io/controller-runtime/pkg/log"
        "sigs.k8s.io/controller-runtime/pkg/predicate"
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
-       "time"
 
        seatav1alpha1 "github.com/apache/seata-k8s/api/v1alpha1"
+       "github.com/apache/seata-k8s/pkg/seata"
+       "github.com/apache/seata-k8s/pkg/utils"
 )
 
 // SeataServerReconciler reconciles a SeataServer object
 type SeataServerReconciler struct {
        client.Client
        Scheme *runtime.Scheme
+       Log    logr.Logger
 }
 
 type reconcileFun func(ctx context.Context, s *seatav1alpha1.SeataServer) error
 
-const RequeueSeconds = 10
+const (
+       RequeueSeconds        = 10
+       MaxRecentErrorRecords = 5
+)
 
 
//+kubebuilder:rbac:groups=operator.seata.apache.org,resources=seataservers,verbs=get;list;watch;create;update;patch;delete
 
//+kubebuilder:rbac:groups=operator.seata.apache.org,resources=seataservers/status,verbs=get;update;patch
@@ -68,24 +74,19 @@ const RequeueSeconds = 10
 // For more details, check Reconcile and its Result here:
 // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.14.1/pkg/reconcile
 func (r *SeataServerReconciler) Reconcile(ctx context.Context, req 
ctrl.Request) (ctrl.Result, error) {
-       logger := log.FromContext(ctx)
-
        s := &seatav1alpha1.SeataServer{}
        if err := r.Get(ctx, req.NamespacedName, s); err != nil {
                if errors.IsNotFound(err) {
-                       logger.Info(fmt.Sprintf("SeataServer(%v) resource not 
found", req.NamespacedName))
+                       r.recordError(ctx, req.NamespacedName, 
seatav1alpha1.ErrorTypeK8s_SeataServer, fmt.Sprintf("Failed to get resource 
SeataServer(%v),err %s", req.NamespacedName, err.Error()), err)
                        return ctrl.Result{}, nil
                }
-
-               logger.Error(err, fmt.Sprintf("Failed to get resource 
SeataServer(%v)", req.NamespacedName))
-               return ctrl.Result{}, err
        }
 
        changed := s.WithDefaults()
        if changed {
-               logger.Info(fmt.Sprintf("Setting default values for 
SeataServer(%v)", req.NamespacedName))
+               r.Log.Info(fmt.Sprintf("Setting default values for 
SeataServer(%v)", req.NamespacedName))
                if err := r.Client.Update(ctx, s); err != nil {
-                       logger.Error(err, fmt.Sprintf("Failed to update 
resource SeataServer(%v)", req.NamespacedName))
+                       r.recordError(ctx, req.NamespacedName, 
seatav1alpha1.ErrorTypeK8s_SeataServer, fmt.Sprintf("Failed to update resource 
SeataServer(%v)", req.NamespacedName), err)
                        return reconcile.Result{}, err
                }
                return reconcile.Result{Requeue: true}, nil
@@ -102,7 +103,7 @@ func (r *SeataServerReconciler) Reconcile(ctx 
context.Context, req ctrl.Request)
        }
 
        if !s.Status.Synchronized {
-               logger.Info(fmt.Sprintf("SeataServer(%v) has not been 
synchronized yet, requeue in %d seconds",
+               r.Log.Info(fmt.Sprintf("SeataServer(%v) has not been 
synchronized yet, requeue in %d seconds",
                        req.NamespacedName, RequeueSeconds))
                return ctrl.Result{Requeue: true, RequeueAfter: RequeueSeconds 
* time.Second}, nil
        }
@@ -111,10 +112,10 @@ func (r *SeataServerReconciler) Reconcile(ctx 
context.Context, req ctrl.Request)
 }
 
 func (r *SeataServerReconciler) reconcileHeadlessService(ctx context.Context, 
s *seatav1alpha1.SeataServer) (err error) {
-       logger := log.FromContext(ctx)
-
        svc := seata.MakeHeadlessService(s)
        if err := controllerutil.SetControllerReference(s, svc, r.Scheme); err 
!= nil {
+               r.recordError(ctx, types.NamespacedName{Name: s.Name, 
Namespace: s.Namespace},
+                       seatav1alpha1.ErrorTypeK8s_HeadlessService, 
fmt.Sprintf("Failed to set owner reference for SeataServer(%v)", s.Name), err)
                return err
        }
        foundSvc := &apiv1.Service{}
@@ -123,22 +124,30 @@ func (r *SeataServerReconciler) 
reconcileHeadlessService(ctx context.Context, s
                Namespace: svc.Namespace,
        }, foundSvc)
        if err != nil && errors.IsNotFound(err) {
-               logger.Info(fmt.Sprintf("Creating a new SeataServer Service 
{%s:%s}",
+               r.Log.Info(fmt.Sprintf("Creating a new SeataServer Service 
{%s:%s}",
                        svc.Namespace, svc.Name))
                err = r.Client.Create(ctx, svc)
                if err != nil {
+                       r.recordError(ctx, types.NamespacedName{Name: s.Name, 
Namespace: s.Namespace},
+                               seatav1alpha1.ErrorTypeK8s_HeadlessService, 
fmt.Sprintf("Failed to create SeataServer Service {%s:%s}",
+                                       svc.Namespace, svc.Name), err)
                        return err
                }
                return nil
        } else if err != nil {
+               r.recordError(ctx, types.NamespacedName{Name: s.Name, 
Namespace: s.Namespace},
+                       seatav1alpha1.ErrorTypeK8s_HeadlessService, 
fmt.Sprintf("Failed to get SeataServer Service {%s:%s}",
+                               svc.Namespace, svc.Name), err)
                return err
        } else {
-               logger.Info(fmt.Sprintf("Updating existing SeataServer Service 
{%s:%s}",
+               r.Log.Info(fmt.Sprintf("Updating existing SeataServer Service 
{%s:%s}",
                        foundSvc.Namespace, foundSvc.Name))
-
                seata.SyncService(foundSvc, svc)
                err = r.Client.Update(ctx, foundSvc)
                if err != nil {
+                       r.recordError(ctx, types.NamespacedName{Name: s.Name, 
Namespace: s.Namespace},
+                               seatav1alpha1.ErrorTypeK8s_HeadlessService, 
fmt.Sprintf("Failed to update SeataServer Service {%s:%s}",
+                                       foundSvc.Namespace, foundSvc.Name), err)
                        return err
                }
        }
@@ -146,8 +155,6 @@ func (r *SeataServerReconciler) 
reconcileHeadlessService(ctx context.Context, s
 }
 
 func (r *SeataServerReconciler) reconcileStatefulSet(ctx context.Context, s 
*seatav1alpha1.SeataServer) (err error) {
-       logger := log.FromContext(ctx)
-
        sts := seata.MakeStatefulSet(s)
        if err := controllerutil.SetControllerReference(s, sts, r.Scheme); err 
!= nil {
                return err
@@ -158,18 +165,27 @@ func (r *SeataServerReconciler) reconcileStatefulSet(ctx 
context.Context, s *sea
                Namespace: sts.Namespace,
        }, foundSts)
        if err != nil && errors.IsNotFound(err) {
-               logger.Info(fmt.Sprintf("Creating a new SeataServer StatefulSet 
{%s:%s}",
+               r.Log.Info(fmt.Sprintf("Creating a new SeataServer StatefulSet 
{%s:%s}",
                        sts.Namespace, sts.Name))
                err = r.Client.Create(ctx, sts)
                if err != nil {
+                       r.recordError(ctx, types.NamespacedName{Name: s.Name, 
Namespace: s.Namespace},
+                               seatav1alpha1.ErrorTypeK8s_StatefulSet, 
fmt.Sprintf("Failed to create SeataServer StatefulSet {%s:%s}",
+                                       sts.Namespace, sts.Name), err)
                        return err
                }
                return nil
        } else if err != nil {
+               r.recordError(ctx, types.NamespacedName{Name: s.Name, 
Namespace: s.Namespace},
+                       seatav1alpha1.ErrorTypeK8s_StatefulSet, 
fmt.Sprintf("Failed to get SeataServer StatefulSet {%s:%s}",
+                               sts.Namespace, sts.Name), err)
                return err
        } else {
                err = r.updateStatefulSet(ctx, s, foundSts, sts)
                if err != nil {
+                       r.recordError(ctx, types.NamespacedName{Name: s.Name, 
Namespace: s.Namespace},
+                               seatav1alpha1.ErrorTypeK8s_StatefulSet, 
fmt.Sprintf("Failed to update SeataServer StatefulSet {%s:%s}",
+                                       foundSts.Namespace, foundSts.Name), err)
                        return err
                }
        }
@@ -179,9 +195,8 @@ func (r *SeataServerReconciler) reconcileStatefulSet(ctx 
context.Context, s *sea
 
 func (r *SeataServerReconciler) updateStatefulSet(ctx context.Context, s 
*seatav1alpha1.SeataServer,
        foundSts *appsv1.StatefulSet, sts *appsv1.StatefulSet) (err error) {
-       logger := log.FromContext(ctx)
 
-       logger.Info(fmt.Sprintf("Updating existing SeataServer StatefulSet 
{%s:%s}", foundSts.Namespace, foundSts.Name))
+       r.Log.Info(fmt.Sprintf("Updating existing SeataServer StatefulSet 
{%s:%s}", foundSts.Namespace, foundSts.Name))
        seata.SyncStatefulSet(foundSts, sts)
 
        err = r.Client.Update(ctx, foundSts)
@@ -202,21 +217,21 @@ func (r *SeataServerReconciler) updateStatefulSet(ctx 
context.Context, s *seatav
                        if env.Name == "console.user.username" {
                                username, err = seata.FetchEnvVar(ctx, 
r.Client, s, env)
                                if err != nil {
-                                       logger.Error(err, "Failed to fetch Env 
console.user.username")
+                                       r.Log.Error(err, "Failed to fetch Env 
console.user.username")
                                }
                        }
                        if env.Name == "console.user.password" {
                                password, err = seata.FetchEnvVar(ctx, 
r.Client, s, env)
                                if err != nil {
-                                       logger.Error(err, "Failed to fetch Env 
console.user.password")
+                                       r.Log.Error(err, "Failed to fetch Env 
console.user.password")
                                }
                        }
                }
                if err = seata.SyncRaftCluster(ctx, s, username, password); err 
!= nil {
-                       logger.Error(err, "Failed to synchronize the raft 
cluster")
+                       r.Log.Error(err, "Failed to synchronize the raft 
cluster")
                        s.Status.Synchronized = false
                } else {
-                       logger.Info("Successfully synchronized the raft 
cluster")
+                       r.Log.Info("Successfully synchronized the raft cluster")
                        s.Status.Synchronized = true
                }
        }
@@ -231,6 +246,8 @@ func (r *SeataServerReconciler) reconcileFinalizers(ctx 
context.Context, instanc
                if !utils.ContainsString(instance.ObjectMeta.Finalizers, 
utils.SeataFinalizer) {
                        instance.ObjectMeta.Finalizers = 
append(instance.ObjectMeta.Finalizers, utils.SeataFinalizer)
                        if err = r.Client.Update(ctx, instance); err != nil {
+                               r.recordError(ctx, types.NamespacedName{Name: 
instance.Name, Namespace: instance.Namespace},
+                                       seatav1alpha1.ErrorTypeK8s_SeataServer, 
fmt.Sprintf("Failed to update resource SeataServer(%v) Finalizers", 
instance.Name), err)
                                return err
                        }
                }
@@ -238,10 +255,14 @@ func (r *SeataServerReconciler) reconcileFinalizers(ctx 
context.Context, instanc
        } else {
                if utils.ContainsString(instance.ObjectMeta.Finalizers, 
utils.SeataFinalizer) {
                        if err = r.cleanUpAllPVCs(ctx, instance); err != nil {
+                               r.recordError(ctx, types.NamespacedName{Name: 
instance.Name, Namespace: instance.Namespace},
+                                       seatav1alpha1.ErrorTypeK8s_SeataServer, 
fmt.Sprintf("Failed to delete resource SeataServer(%v) PVCs", instance.Name), 
err)
                                return err
                        }
                        instance.ObjectMeta.Finalizers = 
utils.RemoveString(instance.ObjectMeta.Finalizers, utils.SeataFinalizer)
                        if err = r.Client.Update(ctx, instance); err != nil {
+                               r.recordError(ctx, types.NamespacedName{Name: 
instance.Name, Namespace: instance.Namespace},
+                                       seatav1alpha1.ErrorTypeK8s_SeataServer, 
fmt.Sprintf("Failed to update resource SeataServer(%v) Finalizers", 
instance.Name), err)
                                return err
                        }
                }
@@ -250,14 +271,13 @@ func (r *SeataServerReconciler) reconcileFinalizers(ctx 
context.Context, instanc
 }
 
 func (r *SeataServerReconciler) cleanupOrphanPVCs(ctx context.Context, s 
*seatav1alpha1.SeataServer) (err error) {
-       logger := log.FromContext(ctx)
        // this check should make sure we do not delete the PVCs before the STS 
has scaled down
        if s.Status.ReadyReplicas == s.Spec.Replicas {
                pvcCount, err := r.getPVCCount(ctx, s)
                if err != nil {
                        return err
                }
-               logger.Info(fmt.Sprintf("cleanupOrphanPVCs with PVC count %d 
and ReadyReplicas count %d", pvcCount, s.Status.ReadyReplicas))
+               r.Log.Info(fmt.Sprintf("cleanupOrphanPVCs with PVC count %d and 
ReadyReplicas count %d", pvcCount, s.Status.ReadyReplicas))
                if pvcCount > int(s.Spec.Replicas) {
                        pvcList, err := r.getPVCList(ctx, s)
                        if err != nil {
@@ -286,6 +306,9 @@ func (r *SeataServerReconciler) getPVCList(ctx 
context.Context, s *seatav1alpha1
        selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
                MatchLabels: map[string]string{"app": s.GetName(), "uid": 
string(s.UID)},
        })
+       if err != nil {
+               return pvList, err
+       }
        pvclistOps := &client.ListOptions{
                Namespace:     s.Namespace,
                LabelSelector: selector,
@@ -307,17 +330,16 @@ func (r *SeataServerReconciler) cleanUpAllPVCs(ctx 
context.Context, s *seatav1al
 }
 
 func (r *SeataServerReconciler) deletePVC(ctx context.Context, pvcItem 
apiv1.PersistentVolumeClaim) {
-       logger := log.FromContext(ctx)
        pvcDelete := &apiv1.PersistentVolumeClaim{
                ObjectMeta: metav1.ObjectMeta{
                        Name:      pvcItem.Name,
                        Namespace: pvcItem.Namespace,
                },
        }
-       logger.Info(fmt.Sprintf("Deleting PVC with name %s", pvcItem.Name))
+       r.Log.Info(fmt.Sprintf("Deleting PVC with name %s", pvcItem.Name))
        err := r.Client.Delete(ctx, pvcDelete)
        if err != nil {
-               logger.Error(err, fmt.Sprintf("Error deleting PVC with name 
%s", pvcDelete))
+               r.Log.Error(err, fmt.Sprintf("Error deleting PVC with name %s", 
pvcDelete))
        }
 }
 
@@ -330,3 +352,27 @@ func (r *SeataServerReconciler) SetupWithManager(mgr 
ctrl.Manager) error {
                WithEventFilter(predicate.GenerationChangedPredicate{}).
                Complete(r)
 }
+
+// update SeataServer error status
+func (r *SeataServerReconciler) recordError(ctx context.Context, prKey 
client.ObjectKey, errorType seatav1alpha1.ServerErrorType, errMsg string, err 
error) error {
+       r.Log.Error(err, errMsg)
+       return retry.RetryOnConflict(retry.DefaultRetry, func() error {
+               newError := seatav1alpha1.SeataServerError{
+                       Type:      errorType.String(),
+                       Message:   errMsg,
+                       Timestamp: metav1.Now(),
+               }
+               toUpdate := seatav1alpha1.SeataServer{}
+               err := r.Get(ctx, prKey, &toUpdate)
+               if err != nil {
+                       r.Log.Error(err, "get seata server object error ", 
"prkey", prKey)
+                       return err
+               }
+               // save recently `MaxErrorRecords` error
+               if len(toUpdate.Status.Errors) >= MaxRecentErrorRecords {
+                       toUpdate.Status.Errors = toUpdate.Status.Errors[1:]
+               }
+               toUpdate.Status.Errors = append(toUpdate.Status.Errors, 
newError)
+               return r.Status().Update(ctx, &toUpdate)
+       })
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org
For additional commands, e-mail: notifications-h...@seata.apache.org

Reply via email to