This is an automated email from the ASF dual-hosted git repository.

jimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-seata-k8s.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f6a241  optimize: optimize controller reconcile (#38)
7f6a241 is described below

commit 7f6a24145ab788f1ce81ebe47b3731b9a14bc37a
Author: jimin <[email protected]>
AuthorDate: Sun Nov 23 00:21:58 2025 +0800

    optimize: optimize controller reconcile (#38)
---
 controllers/seataserver_controller.go | 158 ++++++++++++++++++----------------
 main.go                               |  21 ++---
 pkg/seata/generators.go               |  63 +++++++-------
 pkg/utils/utils.go                    |  32 ++++---
 4 files changed, 148 insertions(+), 126 deletions(-)

diff --git a/controllers/seataserver_controller.go 
b/controllers/seataserver_controller.go
index 67aab90..f52b642 100644
--- a/controllers/seataserver_controller.go
+++ b/controllers/seataserver_controller.go
@@ -55,6 +55,54 @@ const (
        MaxRecentErrorRecords = 5
 )
 
+// reconcileClientObject is a generic method to create, update and sync 
Kubernetes objects
+func (r *SeataServerReconciler) reconcileClientObject(
+       ctx context.Context,
+       s *seatav1alpha1.SeataServer,
+       obj client.Object,
+       getFunc func() client.Object,
+       syncFunc func(found, desired client.Object),
+       errorType seatav1alpha1.ServerErrorType,
+       objDesc string,
+) error {
+       // Set controller reference
+       if err := controllerutil.SetControllerReference(s, obj, r.Scheme); err 
!= nil {
+               r.recordError(ctx, types.NamespacedName{Name: s.Name, 
Namespace: s.Namespace},
+                       errorType, fmt.Sprintf("Failed to set owner reference 
for %s", objDesc), err)
+               return err
+       }
+
+       // Get existing object
+       found := getFunc()
+       err := r.Client.Get(ctx, types.NamespacedName{Name: obj.GetName(), 
Namespace: obj.GetNamespace()}, found)
+
+       switch {
+       case err != nil && errors.IsNotFound(err):
+               // Create new object if not found
+               r.Log.Info(fmt.Sprintf("Creating new %s: %s/%s", objDesc, 
obj.GetNamespace(), obj.GetName()))
+               if err := r.Client.Create(ctx, obj); err != nil {
+                       r.recordError(ctx, types.NamespacedName{Name: s.Name, 
Namespace: s.Namespace},
+                               errorType, fmt.Sprintf("Failed to create %s", 
objDesc), err)
+                       return err
+               }
+       case err != nil:
+               // Error occurred during get operation
+               r.recordError(ctx, types.NamespacedName{Name: s.Name, 
Namespace: s.Namespace},
+                       errorType, fmt.Sprintf("Failed to get %s", objDesc), 
err)
+               return err
+       default:
+               // Update existing object
+               r.Log.Info(fmt.Sprintf("Updating %s: %s/%s", objDesc, 
found.GetNamespace(), found.GetName()))
+               syncFunc(found, obj)
+               if err := r.Client.Update(ctx, found); err != nil {
+                       r.recordError(ctx, types.NamespacedName{Name: s.Name, 
Namespace: s.Namespace},
+                               errorType, fmt.Sprintf("Failed to update %s", 
objDesc), err)
+                       return err
+               }
+       }
+       return nil
+}
+
 
//+kubebuilder:rbac:groups=operator.seata.apache.org,resources=seataservers,verbs=get;list;watch;create;update;patch;delete
 
//+kubebuilder:rbac:groups=operator.seata.apache.org,resources=seataservers/status,verbs=get;update;patch
 
//+kubebuilder:rbac:groups=operator.seata.apache.org,resources=seataservers/finalizers,verbs=update
@@ -77,16 +125,18 @@ func (r *SeataServerReconciler) Reconcile(ctx 
context.Context, req ctrl.Request)
        s := &seatav1alpha1.SeataServer{}
        if err := r.Get(ctx, req.NamespacedName, s); err != nil {
                if errors.IsNotFound(err) {
-                       r.recordError(ctx, req.NamespacedName, 
seatav1alpha1.ErrorTypeK8s_SeataServer, fmt.Sprintf("Failed to get resource 
SeataServer(%v),err %s", req.NamespacedName, err.Error()), err)
+                       // Resource already deleted
                        return ctrl.Result{}, nil
                }
+               r.recordError(ctx, req.NamespacedName, 
seatav1alpha1.ErrorTypeK8s_SeataServer, "Failed to fetch SeataServer", err)
+               return ctrl.Result{}, err
        }
 
-       changed := s.WithDefaults()
-       if changed {
-               r.Log.Info(fmt.Sprintf("Setting default values for 
SeataServer(%v)", req.NamespacedName))
+       // Apply default values if needed
+       if changed := s.WithDefaults(); changed {
+               r.Log.Info("Setting default values for SeataServer", "name", 
s.Name, "namespace", s.Namespace)
                if err := r.Client.Update(ctx, s); err != nil {
-                       r.recordError(ctx, req.NamespacedName, 
seatav1alpha1.ErrorTypeK8s_SeataServer, fmt.Sprintf("Failed to update resource 
SeataServer(%v)", req.NamespacedName), err)
+                       r.recordError(ctx, req.NamespacedName, 
seatav1alpha1.ErrorTypeK8s_SeataServer, "Failed to update SeataServer with 
defaults", err)
                        return reconcile.Result{}, err
                }
                return reconcile.Result{Requeue: true}, nil
@@ -103,93 +153,56 @@ func (r *SeataServerReconciler) Reconcile(ctx 
context.Context, req ctrl.Request)
        }
 
        if !s.Status.Synchronized {
-               r.Log.Info(fmt.Sprintf("SeataServer(%v) has not been 
synchronized yet, requeue in %d seconds",
-                       req.NamespacedName, RequeueSeconds))
+               r.Log.Info("SeataServer not synchronized yet, requeuing", 
"name", s.Name, "namespace", s.Namespace, "requeueAfter", RequeueSeconds)
                return ctrl.Result{Requeue: true, RequeueAfter: RequeueSeconds 
* time.Second}, nil
        }
 
        return ctrl.Result{}, nil
 }
 
-func (r *SeataServerReconciler) reconcileHeadlessService(ctx context.Context, 
s *seatav1alpha1.SeataServer) (err error) {
+func (r *SeataServerReconciler) reconcileHeadlessService(ctx context.Context, 
s *seatav1alpha1.SeataServer) error {
        svc := seata.MakeHeadlessService(s)
-       if err := controllerutil.SetControllerReference(s, svc, r.Scheme); err 
!= nil {
-               r.recordError(ctx, types.NamespacedName{Name: s.Name, 
Namespace: s.Namespace},
-                       seatav1alpha1.ErrorTypeK8s_HeadlessService, 
fmt.Sprintf("Failed to set owner reference for SeataServer(%v)", s.Name), err)
-               return err
-       }
-       foundSvc := &apiv1.Service{}
-       err = r.Client.Get(ctx, types.NamespacedName{
-               Name:      svc.Name,
-               Namespace: svc.Namespace,
-       }, foundSvc)
-       if err != nil && errors.IsNotFound(err) {
-               r.Log.Info(fmt.Sprintf("Creating a new SeataServer Service 
{%s:%s}",
-                       svc.Namespace, svc.Name))
-               err = r.Client.Create(ctx, svc)
-               if err != nil {
-                       r.recordError(ctx, types.NamespacedName{Name: s.Name, 
Namespace: s.Namespace},
-                               seatav1alpha1.ErrorTypeK8s_HeadlessService, 
fmt.Sprintf("Failed to create SeataServer Service {%s:%s}",
-                                       svc.Namespace, svc.Name), err)
-                       return err
-               }
-               return nil
-       } else if err != nil {
-               r.recordError(ctx, types.NamespacedName{Name: s.Name, 
Namespace: s.Namespace},
-                       seatav1alpha1.ErrorTypeK8s_HeadlessService, 
fmt.Sprintf("Failed to get SeataServer Service {%s:%s}",
-                               svc.Namespace, svc.Name), err)
-               return err
-       } else {
-               r.Log.Info(fmt.Sprintf("Updating existing SeataServer Service 
{%s:%s}",
-                       foundSvc.Namespace, foundSvc.Name))
-               seata.SyncService(foundSvc, svc)
-               err = r.Client.Update(ctx, foundSvc)
-               if err != nil {
-                       r.recordError(ctx, types.NamespacedName{Name: s.Name, 
Namespace: s.Namespace},
-                               seatav1alpha1.ErrorTypeK8s_HeadlessService, 
fmt.Sprintf("Failed to update SeataServer Service {%s:%s}",
-                                       foundSvc.Namespace, foundSvc.Name), err)
-                       return err
-               }
-       }
-       return nil
+       return r.reconcileClientObject(
+               ctx,
+               s,
+               svc,
+               func() client.Object { return &apiv1.Service{} },
+               func(found, desired client.Object) {
+                       seata.SyncService(found.(*apiv1.Service), 
desired.(*apiv1.Service))
+               },
+               seatav1alpha1.ErrorTypeK8s_HeadlessService,
+               "Headless Service",
+       )
 }
 
-func (r *SeataServerReconciler) reconcileStatefulSet(ctx context.Context, s 
*seatav1alpha1.SeataServer) (err error) {
+func (r *SeataServerReconciler) reconcileStatefulSet(ctx context.Context, s 
*seatav1alpha1.SeataServer) error {
        sts := seata.MakeStatefulSet(s)
        if err := controllerutil.SetControllerReference(s, sts, r.Scheme); err 
!= nil {
                return err
        }
+
        foundSts := &appsv1.StatefulSet{}
-       err = r.Client.Get(ctx, types.NamespacedName{
-               Name:      sts.Name,
-               Namespace: sts.Namespace,
-       }, foundSts)
-       if err != nil && errors.IsNotFound(err) {
-               r.Log.Info(fmt.Sprintf("Creating a new SeataServer StatefulSet 
{%s:%s}",
-                       sts.Namespace, sts.Name))
-               err = r.Client.Create(ctx, sts)
-               if err != nil {
+       err := r.Client.Get(ctx, types.NamespacedName{Name: sts.Name, 
Namespace: sts.Namespace}, foundSts)
+
+       switch {
+       case err != nil && errors.IsNotFound(err):
+               r.Log.Info(fmt.Sprintf("Creating new StatefulSet: %s/%s", 
sts.Namespace, sts.Name))
+               if err := r.Client.Create(ctx, sts); err != nil {
                        r.recordError(ctx, types.NamespacedName{Name: s.Name, 
Namespace: s.Namespace},
-                               seatav1alpha1.ErrorTypeK8s_StatefulSet, 
fmt.Sprintf("Failed to create SeataServer StatefulSet {%s:%s}",
-                                       sts.Namespace, sts.Name), err)
+                               seatav1alpha1.ErrorTypeK8s_StatefulSet, "Failed 
to create StatefulSet", err)
                        return err
                }
-               return nil
-       } else if err != nil {
+       case err != nil:
                r.recordError(ctx, types.NamespacedName{Name: s.Name, 
Namespace: s.Namespace},
-                       seatav1alpha1.ErrorTypeK8s_StatefulSet, 
fmt.Sprintf("Failed to get SeataServer StatefulSet {%s:%s}",
-                               sts.Namespace, sts.Name), err)
+                       seatav1alpha1.ErrorTypeK8s_StatefulSet, "Failed to get 
StatefulSet", err)
                return err
-       } else {
-               err = r.updateStatefulSet(ctx, s, foundSts, sts)
-               if err != nil {
+       default:
+               if err := r.updateStatefulSet(ctx, s, foundSts, sts); err != 
nil {
                        r.recordError(ctx, types.NamespacedName{Name: s.Name, 
Namespace: s.Namespace},
-                               seatav1alpha1.ErrorTypeK8s_StatefulSet, 
fmt.Sprintf("Failed to update SeataServer StatefulSet {%s:%s}",
-                                       foundSts.Namespace, foundSts.Name), err)
+                               seatav1alpha1.ErrorTypeK8s_StatefulSet, "Failed 
to update StatefulSet", err)
                        return err
                }
        }
-
        return nil
 }
 
@@ -336,10 +349,9 @@ func (r *SeataServerReconciler) deletePVC(ctx 
context.Context, pvcItem apiv1.Per
                        Namespace: pvcItem.Namespace,
                },
        }
-       r.Log.Info(fmt.Sprintf("Deleting PVC with name %s", pvcItem.Name))
-       err := r.Client.Delete(ctx, pvcDelete)
-       if err != nil {
-               r.Log.Error(err, fmt.Sprintf("Error deleting PVC with name %s", 
pvcDelete))
+       r.Log.Info("Deleting PVC", "name", pvcItem.Name, "namespace", 
pvcItem.Namespace)
+       if err := r.Client.Delete(ctx, pvcDelete); err != nil {
+               r.Log.Error(err, "Failed to delete PVC", "name", pvcItem.Name, 
"namespace", pvcItem.Namespace)
        }
 }
 
diff --git a/main.go b/main.go
index 87d5153..64ce951 100644
--- a/main.go
+++ b/main.go
@@ -20,6 +20,7 @@ package main
 import (
        "flag"
        "os"
+
        metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
 
        // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, 
etc.)
@@ -51,45 +52,39 @@ func init() {
 }
 
 func main() {
+       // Parse command line flags
        var metricsAddr string
        var enableLeaderElection bool
        var probeAddr string
+
        flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The 
address the metric endpoint binds to.")
        flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The 
address the probe endpoint binds to.")
        flag.BoolVar(&enableLeaderElection, "leader-elect", false,
                "Enable leader election for controller manager. "+
                        "Enabling this will ensure there is only one active 
controller manager.")
+
+       // Setup logger
        opts := zap.Options{
                Development: true,
        }
        opts.BindFlags(flag.CommandLine)
        flag.Parse()
-
        ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))
 
+       // Create and start manager
        mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
                Scheme:                 scheme,
                Metrics:                metricsserver.Options{BindAddress: 
metricsAddr},
                HealthProbeBindAddress: probeAddr,
                LeaderElection:         enableLeaderElection,
                LeaderElectionID:       "e60df718.seata.apache.org",
-               // LeaderElectionReleaseOnCancel defines if the leader should 
step down voluntarily
-               // when the Manager ends. This requires the binary to 
immediately end when the
-               // Manager is stopped, otherwise, this setting is unsafe. 
Setting this significantly
-               // speeds up voluntary leader transitions as the new leader 
don't have to wait
-               // LeaseDuration time first.
-               //
-               // In the default scaffold provided, the program ends 
immediately after
-               // the manager stops, so would be fine to enable this option. 
However,
-               // if you are doing or is intended to do any operation such as 
perform cleanups
-               // after the manager stops then its usage might be unsafe.
-               // LeaderElectionReleaseOnCancel: true,
        })
        if err != nil {
                setupLog.Error(err, "unable to start manager")
                os.Exit(1)
        }
 
+       // Setup SeataServer reconciler
        if err = (&controllers.SeataServerReconciler{
                Client: mgr.GetClient(),
                Scheme: mgr.GetScheme(),
@@ -99,6 +94,7 @@ func main() {
        }
        //+kubebuilder:scaffold:builder
 
+       // Setup health checks
        if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
                setupLog.Error(err, "unable to set up health check")
                os.Exit(1)
@@ -108,6 +104,7 @@ func main() {
                os.Exit(1)
        }
 
+       // Start manager
        setupLog.Info("starting manager")
        if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
                setupLog.Error(err, "problem running manager")
diff --git a/pkg/seata/generators.go b/pkg/seata/generators.go
index a5e7a16..47830e2 100644
--- a/pkg/seata/generators.go
+++ b/pkg/seata/generators.go
@@ -19,12 +19,13 @@ package seata
 
 import (
        "fmt"
+       "strconv"
+
        seatav1alpha1 "github.com/apache/seata-k8s/api/v1alpha1"
        "github.com/apache/seata-k8s/pkg/utils"
        appsv1 "k8s.io/api/apps/v1"
        apiv1 "k8s.io/api/core/v1"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
-       "strconv"
 )
 
 func makeLabels(name string) map[string]string {
@@ -93,38 +94,44 @@ func MakeStatefulSet(s *seatav1alpha1.SeataServer) 
*appsv1.StatefulSet {
                },
        }
 
-       statefulSet.Spec.Template.Spec.Containers = []apiv1.Container{{
-               Name:  s.Spec.ContainerName,
-               Image: s.Spec.Image,
-               Command: []string{
-                       "/bin/bash",
-               },
+       container := &apiv1.Container{
+               Name:    s.Spec.ContainerName,
+               Image:   s.Spec.Image,
+               Command: []string{"/bin/bash"},
                Args: []string{
                        "-c",
-                       fmt.Sprintf("export SEATA_IP=$(HOST_NAME).%s;", 
s.Spec.ServiceName) +
-                               fmt.Sprintf("python3 -c \"\n%s\n\";", 
PythonScript) +
-                               "/bin/bash /seata-server-entrypoint.sh;",
+                       buildEntrypointScript(s),
                },
                Ports: []apiv1.ContainerPort{
                        {Name: "service-port", ContainerPort: 
s.Spec.Ports.ServicePort},
                        {Name: "console-port", ContainerPort: 
s.Spec.Ports.ConsolePort},
                        {Name: "raft-port", ContainerPort: 
s.Spec.Ports.RaftPort},
                },
-               VolumeMounts: []apiv1.VolumeMount{{
-                       Name:      "seata-store",
-                       MountPath: "/seata-server/sessionStore",
-               }},
-       }}
-
-       container := &statefulSet.Spec.Template.Spec.Containers[0]
-       container.Resources = s.Spec.Resources
+               VolumeMounts: []apiv1.VolumeMount{
+                       {
+                               Name:      pvcName,
+                               MountPath: "/seata-server/sessionStore",
+                       },
+               },
+               Resources: s.Spec.Resources,
+               Env:       buildEnvVars(s),
+       }
+
+       statefulSet.Spec.Template.Spec.Containers = 
[]apiv1.Container{*container}
        statefulSet.Spec.Replicas = &s.Spec.Replicas
-       container.Image = s.Spec.Image
 
-       container.Ports[0].ContainerPort = s.Spec.Ports.ConsolePort
-       container.Ports[1].ContainerPort = s.Spec.Ports.ServicePort
-       container.Ports[2].ContainerPort = s.Spec.Ports.RaftPort
+       return statefulSet
+}
+
+// buildEntrypointScript constructs the container entrypoint script
+func buildEntrypointScript(s *seatav1alpha1.SeataServer) string {
+       return fmt.Sprintf("export SEATA_IP=$(HOST_NAME).%s;", 
s.Spec.ServiceName) +
+               fmt.Sprintf("python3 -c \"\n%s\n\";", PythonScript) +
+               "/bin/bash /seata-server-entrypoint.sh;"
+}
 
+// buildEnvVars constructs environment variables for Seata server
+func buildEnvVars(s *seatav1alpha1.SeataServer) []apiv1.EnvVar {
        envs := []apiv1.EnvVar{
                {
                        Name: "HOST_NAME",
@@ -135,14 +142,10 @@ func MakeStatefulSet(s *seatav1alpha1.SeataServer) 
*appsv1.StatefulSet {
                {Name: "store.mode", Value: "raft"},
                {Name: "server.port", Value: 
strconv.Itoa(int(s.Spec.Ports.ConsolePort))},
                {Name: "server.servicePort", Value: 
strconv.Itoa(int(s.Spec.Ports.ServicePort))},
+               {Name: "server.raft.serverAddr", Value: 
utils.ConcatRaftServerAddress(s)},
        }
 
-       addr := utils.ConcatRaftServerAddress(s)
-       envs = append(envs, apiv1.EnvVar{Name: "server.raft.serverAddr", Value: 
addr})
-       for _, env := range s.Spec.Env {
-               envs = append(envs, env)
-       }
-       container.Env = envs
-
-       return statefulSet
+       // Append user-provided environment variables
+       envs = append(envs, s.Spec.Env...)
+       return envs
 }
diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go
index 38222fb..38dcfc3 100644
--- a/pkg/utils/utils.go
+++ b/pkg/utils/utils.go
@@ -19,27 +19,32 @@ package utils
 
 import (
        "fmt"
-       seatav1alpha1 "github.com/apache/seata-k8s/api/v1alpha1"
        "strconv"
        "strings"
+
+       seatav1alpha1 "github.com/apache/seata-k8s/api/v1alpha1"
 )
 
 const (
        SeataFinalizer = "cleanUpSeataPVC"
 )
 
+// ConcatRaftServerAddress builds a comma-separated list of Raft server 
addresses
 func ConcatRaftServerAddress(s *seatav1alpha1.SeataServer) string {
        var addrBuilder strings.Builder
        for i := int32(0); i < s.Spec.Replicas; i++ {
-               // Add governed service name to communicate to each other
+               // Format: pod-ordinal.service-name:raft-port
                addrBuilder.WriteString(fmt.Sprintf("%s-%d.%s:%d,", s.Name, i, 
s.Spec.ServiceName, s.Spec.Ports.RaftPort))
-               //addrBuilder.WriteString(fmt.Sprintf("%s-%d:%d,", s.Name, i, 
s.Spec.Ports.RaftPort))
        }
+
        addr := addrBuilder.String()
-       addr = addr[:len(addr)-1]
+       if len(addr) > 0 {
+               addr = addr[:len(addr)-1] // Remove trailing comma
+       }
        return addr
 }
 
+// ContainsString checks if a string slice contains a specific string
 func ContainsString(slice []string, str string) bool {
        for _, item := range slice {
                if item == str {
@@ -49,26 +54,31 @@ func ContainsString(slice []string, str string) bool {
        return false
 }
 
-func RemoveString(slice []string, str string) (result []string) {
+// RemoveString removes the first occurrence of a string from a slice
+func RemoveString(slice []string, str string) []string {
+       var result []string
        for _, item := range slice {
-               if item == str {
-                       continue
+               if item != str {
+                       result = append(result, item)
                }
-               result = append(result, item)
        }
        return result
 }
 
+// IsPVCOrphan checks if a PVC is orphaned (its ordinal >= replicas)
 func IsPVCOrphan(pvcName string, replicas int32) bool {
-       index := strings.LastIndexAny(pvcName, "-")
-       if index == -1 {
+       // PVC names follow pattern: <name>-<ordinal>
+       lastDashIdx := strings.LastIndexAny(pvcName, "-")
+       if lastDashIdx == -1 {
                return false
        }
 
-       ordinal, err := strconv.Atoi(pvcName[index+1:])
+       // Extract and parse the ordinal from PVC name
+       ordinal, err := strconv.Atoi(pvcName[lastDashIdx+1:])
        if err != nil {
                return false
        }
 
+       // PVC is orphaned if its ordinal is >= current replica count
        return int32(ordinal) >= replicas
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to