This is an automated email from the ASF dual-hosted git repository.
jimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-seata-k8s.git
The following commit(s) were added to refs/heads/master by this push:
new 7f6a241 optimize: optimize controller reconcile (#38)
7f6a241 is described below
commit 7f6a24145ab788f1ce81ebe47b3731b9a14bc37a
Author: jimin <[email protected]>
AuthorDate: Sun Nov 23 00:21:58 2025 +0800
optimize: optimize controller reconcile (#38)
---
controllers/seataserver_controller.go | 158 ++++++++++++++++++----------------
main.go | 21 ++---
pkg/seata/generators.go | 63 +++++++-------
pkg/utils/utils.go | 32 ++++---
4 files changed, 148 insertions(+), 126 deletions(-)
diff --git a/controllers/seataserver_controller.go
b/controllers/seataserver_controller.go
index 67aab90..f52b642 100644
--- a/controllers/seataserver_controller.go
+++ b/controllers/seataserver_controller.go
@@ -55,6 +55,54 @@ const (
MaxRecentErrorRecords = 5
)
+// reconcileClientObject is a generic method to create, update and sync
Kubernetes objects
+func (r *SeataServerReconciler) reconcileClientObject(
+ ctx context.Context,
+ s *seatav1alpha1.SeataServer,
+ obj client.Object,
+ getFunc func() client.Object,
+ syncFunc func(found, desired client.Object),
+ errorType seatav1alpha1.ServerErrorType,
+ objDesc string,
+) error {
+ // Set controller reference
+ if err := controllerutil.SetControllerReference(s, obj, r.Scheme); err
!= nil {
+ r.recordError(ctx, types.NamespacedName{Name: s.Name,
Namespace: s.Namespace},
+ errorType, fmt.Sprintf("Failed to set owner reference
for %s", objDesc), err)
+ return err
+ }
+
+ // Get existing object
+ found := getFunc()
+ err := r.Client.Get(ctx, types.NamespacedName{Name: obj.GetName(),
Namespace: obj.GetNamespace()}, found)
+
+ switch {
+ case err != nil && errors.IsNotFound(err):
+ // Create new object if not found
+ r.Log.Info(fmt.Sprintf("Creating new %s: %s/%s", objDesc,
obj.GetNamespace(), obj.GetName()))
+ if err := r.Client.Create(ctx, obj); err != nil {
+ r.recordError(ctx, types.NamespacedName{Name: s.Name,
Namespace: s.Namespace},
+ errorType, fmt.Sprintf("Failed to create %s",
objDesc), err)
+ return err
+ }
+ case err != nil:
+ // Error occurred during get operation
+ r.recordError(ctx, types.NamespacedName{Name: s.Name,
Namespace: s.Namespace},
+ errorType, fmt.Sprintf("Failed to get %s", objDesc),
err)
+ return err
+ default:
+ // Update existing object
+ r.Log.Info(fmt.Sprintf("Updating %s: %s/%s", objDesc,
found.GetNamespace(), found.GetName()))
+ syncFunc(found, obj)
+ if err := r.Client.Update(ctx, found); err != nil {
+ r.recordError(ctx, types.NamespacedName{Name: s.Name,
Namespace: s.Namespace},
+ errorType, fmt.Sprintf("Failed to update %s",
objDesc), err)
+ return err
+ }
+ }
+ return nil
+}
+
//+kubebuilder:rbac:groups=operator.seata.apache.org,resources=seataservers,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=operator.seata.apache.org,resources=seataservers/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=operator.seata.apache.org,resources=seataservers/finalizers,verbs=update
@@ -77,16 +125,18 @@ func (r *SeataServerReconciler) Reconcile(ctx
context.Context, req ctrl.Request)
s := &seatav1alpha1.SeataServer{}
if err := r.Get(ctx, req.NamespacedName, s); err != nil {
if errors.IsNotFound(err) {
- r.recordError(ctx, req.NamespacedName,
seatav1alpha1.ErrorTypeK8s_SeataServer, fmt.Sprintf("Failed to get resource
SeataServer(%v),err %s", req.NamespacedName, err.Error()), err)
+ // Resource already deleted
return ctrl.Result{}, nil
}
+ r.recordError(ctx, req.NamespacedName,
seatav1alpha1.ErrorTypeK8s_SeataServer, "Failed to fetch SeataServer", err)
+ return ctrl.Result{}, err
}
- changed := s.WithDefaults()
- if changed {
- r.Log.Info(fmt.Sprintf("Setting default values for
SeataServer(%v)", req.NamespacedName))
+ // Apply default values if needed
+ if changed := s.WithDefaults(); changed {
+ r.Log.Info("Setting default values for SeataServer", "name",
s.Name, "namespace", s.Namespace)
if err := r.Client.Update(ctx, s); err != nil {
- r.recordError(ctx, req.NamespacedName,
seatav1alpha1.ErrorTypeK8s_SeataServer, fmt.Sprintf("Failed to update resource
SeataServer(%v)", req.NamespacedName), err)
+ r.recordError(ctx, req.NamespacedName,
seatav1alpha1.ErrorTypeK8s_SeataServer, "Failed to update SeataServer with
defaults", err)
return reconcile.Result{}, err
}
return reconcile.Result{Requeue: true}, nil
@@ -103,93 +153,56 @@ func (r *SeataServerReconciler) Reconcile(ctx
context.Context, req ctrl.Request)
}
if !s.Status.Synchronized {
- r.Log.Info(fmt.Sprintf("SeataServer(%v) has not been
synchronized yet, requeue in %d seconds",
- req.NamespacedName, RequeueSeconds))
+ r.Log.Info("SeataServer not synchronized yet, requeuing",
"name", s.Name, "namespace", s.Namespace, "requeueAfter", RequeueSeconds)
return ctrl.Result{Requeue: true, RequeueAfter: RequeueSeconds
* time.Second}, nil
}
return ctrl.Result{}, nil
}
-func (r *SeataServerReconciler) reconcileHeadlessService(ctx context.Context,
s *seatav1alpha1.SeataServer) (err error) {
+func (r *SeataServerReconciler) reconcileHeadlessService(ctx context.Context,
s *seatav1alpha1.SeataServer) error {
svc := seata.MakeHeadlessService(s)
- if err := controllerutil.SetControllerReference(s, svc, r.Scheme); err
!= nil {
- r.recordError(ctx, types.NamespacedName{Name: s.Name,
Namespace: s.Namespace},
- seatav1alpha1.ErrorTypeK8s_HeadlessService,
fmt.Sprintf("Failed to set owner reference for SeataServer(%v)", s.Name), err)
- return err
- }
- foundSvc := &apiv1.Service{}
- err = r.Client.Get(ctx, types.NamespacedName{
- Name: svc.Name,
- Namespace: svc.Namespace,
- }, foundSvc)
- if err != nil && errors.IsNotFound(err) {
- r.Log.Info(fmt.Sprintf("Creating a new SeataServer Service
{%s:%s}",
- svc.Namespace, svc.Name))
- err = r.Client.Create(ctx, svc)
- if err != nil {
- r.recordError(ctx, types.NamespacedName{Name: s.Name,
Namespace: s.Namespace},
- seatav1alpha1.ErrorTypeK8s_HeadlessService,
fmt.Sprintf("Failed to create SeataServer Service {%s:%s}",
- svc.Namespace, svc.Name), err)
- return err
- }
- return nil
- } else if err != nil {
- r.recordError(ctx, types.NamespacedName{Name: s.Name,
Namespace: s.Namespace},
- seatav1alpha1.ErrorTypeK8s_HeadlessService,
fmt.Sprintf("Failed to get SeataServer Service {%s:%s}",
- svc.Namespace, svc.Name), err)
- return err
- } else {
- r.Log.Info(fmt.Sprintf("Updating existing SeataServer Service
{%s:%s}",
- foundSvc.Namespace, foundSvc.Name))
- seata.SyncService(foundSvc, svc)
- err = r.Client.Update(ctx, foundSvc)
- if err != nil {
- r.recordError(ctx, types.NamespacedName{Name: s.Name,
Namespace: s.Namespace},
- seatav1alpha1.ErrorTypeK8s_HeadlessService,
fmt.Sprintf("Failed to update SeataServer Service {%s:%s}",
- foundSvc.Namespace, foundSvc.Name), err)
- return err
- }
- }
- return nil
+ return r.reconcileClientObject(
+ ctx,
+ s,
+ svc,
+ func() client.Object { return &apiv1.Service{} },
+ func(found, desired client.Object) {
+ seata.SyncService(found.(*apiv1.Service),
desired.(*apiv1.Service))
+ },
+ seatav1alpha1.ErrorTypeK8s_HeadlessService,
+ "Headless Service",
+ )
}
-func (r *SeataServerReconciler) reconcileStatefulSet(ctx context.Context, s
*seatav1alpha1.SeataServer) (err error) {
+func (r *SeataServerReconciler) reconcileStatefulSet(ctx context.Context, s
*seatav1alpha1.SeataServer) error {
sts := seata.MakeStatefulSet(s)
if err := controllerutil.SetControllerReference(s, sts, r.Scheme); err
!= nil {
return err
}
+
foundSts := &appsv1.StatefulSet{}
- err = r.Client.Get(ctx, types.NamespacedName{
- Name: sts.Name,
- Namespace: sts.Namespace,
- }, foundSts)
- if err != nil && errors.IsNotFound(err) {
- r.Log.Info(fmt.Sprintf("Creating a new SeataServer StatefulSet
{%s:%s}",
- sts.Namespace, sts.Name))
- err = r.Client.Create(ctx, sts)
- if err != nil {
+ err := r.Client.Get(ctx, types.NamespacedName{Name: sts.Name,
Namespace: sts.Namespace}, foundSts)
+
+ switch {
+ case err != nil && errors.IsNotFound(err):
+ r.Log.Info(fmt.Sprintf("Creating new StatefulSet: %s/%s",
sts.Namespace, sts.Name))
+ if err := r.Client.Create(ctx, sts); err != nil {
r.recordError(ctx, types.NamespacedName{Name: s.Name,
Namespace: s.Namespace},
- seatav1alpha1.ErrorTypeK8s_StatefulSet,
fmt.Sprintf("Failed to create SeataServer StatefulSet {%s:%s}",
- sts.Namespace, sts.Name), err)
+ seatav1alpha1.ErrorTypeK8s_StatefulSet, "Failed
to create StatefulSet", err)
return err
}
- return nil
- } else if err != nil {
+ case err != nil:
r.recordError(ctx, types.NamespacedName{Name: s.Name,
Namespace: s.Namespace},
- seatav1alpha1.ErrorTypeK8s_StatefulSet,
fmt.Sprintf("Failed to get SeataServer StatefulSet {%s:%s}",
- sts.Namespace, sts.Name), err)
+ seatav1alpha1.ErrorTypeK8s_StatefulSet, "Failed to get
StatefulSet", err)
return err
- } else {
- err = r.updateStatefulSet(ctx, s, foundSts, sts)
- if err != nil {
+ default:
+ if err := r.updateStatefulSet(ctx, s, foundSts, sts); err !=
nil {
r.recordError(ctx, types.NamespacedName{Name: s.Name,
Namespace: s.Namespace},
- seatav1alpha1.ErrorTypeK8s_StatefulSet,
fmt.Sprintf("Failed to update SeataServer StatefulSet {%s:%s}",
- foundSts.Namespace, foundSts.Name), err)
+ seatav1alpha1.ErrorTypeK8s_StatefulSet, "Failed
to update StatefulSet", err)
return err
}
}
-
return nil
}
@@ -336,10 +349,9 @@ func (r *SeataServerReconciler) deletePVC(ctx
context.Context, pvcItem apiv1.Per
Namespace: pvcItem.Namespace,
},
}
- r.Log.Info(fmt.Sprintf("Deleting PVC with name %s", pvcItem.Name))
- err := r.Client.Delete(ctx, pvcDelete)
- if err != nil {
- r.Log.Error(err, fmt.Sprintf("Error deleting PVC with name %s",
pvcDelete))
+ r.Log.Info("Deleting PVC", "name", pvcItem.Name, "namespace",
pvcItem.Namespace)
+ if err := r.Client.Delete(ctx, pvcDelete); err != nil {
+ r.Log.Error(err, "Failed to delete PVC", "name", pvcItem.Name,
"namespace", pvcItem.Namespace)
}
}
diff --git a/main.go b/main.go
index 87d5153..64ce951 100644
--- a/main.go
+++ b/main.go
@@ -20,6 +20,7 @@ package main
import (
"flag"
"os"
+
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC,
etc.)
@@ -51,45 +52,39 @@ func init() {
}
func main() {
+ // Parse command line flags
var metricsAddr string
var enableLeaderElection bool
var probeAddr string
+
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The
address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The
address the probe endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active
controller manager.")
+
+ // Setup logger
opts := zap.Options{
Development: true,
}
opts.BindFlags(flag.CommandLine)
flag.Parse()
-
ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))
+ // Create and start manager
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
Metrics: metricsserver.Options{BindAddress:
metricsAddr},
HealthProbeBindAddress: probeAddr,
LeaderElection: enableLeaderElection,
LeaderElectionID: "e60df718.seata.apache.org",
- // LeaderElectionReleaseOnCancel defines if the leader should
step down voluntarily
- // when the Manager ends. This requires the binary to
immediately end when the
- // Manager is stopped, otherwise, this setting is unsafe.
Setting this significantly
- // speeds up voluntary leader transitions as the new leader
don't have to wait
- // LeaseDuration time first.
- //
- // In the default scaffold provided, the program ends
immediately after
- // the manager stops, so would be fine to enable this option.
However,
- // if you are doing or is intended to do any operation such as
perform cleanups
- // after the manager stops then its usage might be unsafe.
- // LeaderElectionReleaseOnCancel: true,
})
if err != nil {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
}
+ // Setup SeataServer reconciler
if err = (&controllers.SeataServerReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
@@ -99,6 +94,7 @@ func main() {
}
//+kubebuilder:scaffold:builder
+ // Setup health checks
if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up health check")
os.Exit(1)
@@ -108,6 +104,7 @@ func main() {
os.Exit(1)
}
+ // Start manager
setupLog.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
diff --git a/pkg/seata/generators.go b/pkg/seata/generators.go
index a5e7a16..47830e2 100644
--- a/pkg/seata/generators.go
+++ b/pkg/seata/generators.go
@@ -19,12 +19,13 @@ package seata
import (
"fmt"
+ "strconv"
+
seatav1alpha1 "github.com/apache/seata-k8s/api/v1alpha1"
"github.com/apache/seata-k8s/pkg/utils"
appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "strconv"
)
func makeLabels(name string) map[string]string {
@@ -93,38 +94,44 @@ func MakeStatefulSet(s *seatav1alpha1.SeataServer)
*appsv1.StatefulSet {
},
}
- statefulSet.Spec.Template.Spec.Containers = []apiv1.Container{{
- Name: s.Spec.ContainerName,
- Image: s.Spec.Image,
- Command: []string{
- "/bin/bash",
- },
+ container := &apiv1.Container{
+ Name: s.Spec.ContainerName,
+ Image: s.Spec.Image,
+ Command: []string{"/bin/bash"},
Args: []string{
"-c",
- fmt.Sprintf("export SEATA_IP=$(HOST_NAME).%s;",
s.Spec.ServiceName) +
- fmt.Sprintf("python3 -c \"\n%s\n\";",
PythonScript) +
- "/bin/bash /seata-server-entrypoint.sh;",
+ buildEntrypointScript(s),
},
Ports: []apiv1.ContainerPort{
{Name: "service-port", ContainerPort:
s.Spec.Ports.ServicePort},
{Name: "console-port", ContainerPort:
s.Spec.Ports.ConsolePort},
{Name: "raft-port", ContainerPort:
s.Spec.Ports.RaftPort},
},
- VolumeMounts: []apiv1.VolumeMount{{
- Name: "seata-store",
- MountPath: "/seata-server/sessionStore",
- }},
- }}
-
- container := &statefulSet.Spec.Template.Spec.Containers[0]
- container.Resources = s.Spec.Resources
+ VolumeMounts: []apiv1.VolumeMount{
+ {
+ Name: pvcName,
+ MountPath: "/seata-server/sessionStore",
+ },
+ },
+ Resources: s.Spec.Resources,
+ Env: buildEnvVars(s),
+ }
+
+ statefulSet.Spec.Template.Spec.Containers =
[]apiv1.Container{*container}
statefulSet.Spec.Replicas = &s.Spec.Replicas
- container.Image = s.Spec.Image
- container.Ports[0].ContainerPort = s.Spec.Ports.ConsolePort
- container.Ports[1].ContainerPort = s.Spec.Ports.ServicePort
- container.Ports[2].ContainerPort = s.Spec.Ports.RaftPort
+ return statefulSet
+}
+
+// buildEntrypointScript constructs the container entrypoint script
+func buildEntrypointScript(s *seatav1alpha1.SeataServer) string {
+ return fmt.Sprintf("export SEATA_IP=$(HOST_NAME).%s;",
s.Spec.ServiceName) +
+ fmt.Sprintf("python3 -c \"\n%s\n\";", PythonScript) +
+ "/bin/bash /seata-server-entrypoint.sh;"
+}
+// buildEnvVars constructs environment variables for Seata server
+func buildEnvVars(s *seatav1alpha1.SeataServer) []apiv1.EnvVar {
envs := []apiv1.EnvVar{
{
Name: "HOST_NAME",
@@ -135,14 +142,10 @@ func MakeStatefulSet(s *seatav1alpha1.SeataServer)
*appsv1.StatefulSet {
{Name: "store.mode", Value: "raft"},
{Name: "server.port", Value:
strconv.Itoa(int(s.Spec.Ports.ConsolePort))},
{Name: "server.servicePort", Value:
strconv.Itoa(int(s.Spec.Ports.ServicePort))},
+ {Name: "server.raft.serverAddr", Value:
utils.ConcatRaftServerAddress(s)},
}
- addr := utils.ConcatRaftServerAddress(s)
- envs = append(envs, apiv1.EnvVar{Name: "server.raft.serverAddr", Value:
addr})
- for _, env := range s.Spec.Env {
- envs = append(envs, env)
- }
- container.Env = envs
-
- return statefulSet
+ // Append user-provided environment variables
+ envs = append(envs, s.Spec.Env...)
+ return envs
}
diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go
index 38222fb..38dcfc3 100644
--- a/pkg/utils/utils.go
+++ b/pkg/utils/utils.go
@@ -19,27 +19,32 @@ package utils
import (
"fmt"
- seatav1alpha1 "github.com/apache/seata-k8s/api/v1alpha1"
"strconv"
"strings"
+
+ seatav1alpha1 "github.com/apache/seata-k8s/api/v1alpha1"
)
const (
SeataFinalizer = "cleanUpSeataPVC"
)
+// ConcatRaftServerAddress builds a comma-separated list of Raft server
addresses
func ConcatRaftServerAddress(s *seatav1alpha1.SeataServer) string {
var addrBuilder strings.Builder
for i := int32(0); i < s.Spec.Replicas; i++ {
- // Add governed service name to communicate to each other
+ // Format: pod-ordinal.service-name:raft-port
addrBuilder.WriteString(fmt.Sprintf("%s-%d.%s:%d,", s.Name, i,
s.Spec.ServiceName, s.Spec.Ports.RaftPort))
- //addrBuilder.WriteString(fmt.Sprintf("%s-%d:%d,", s.Name, i,
s.Spec.Ports.RaftPort))
}
+
addr := addrBuilder.String()
- addr = addr[:len(addr)-1]
+ if len(addr) > 0 {
+ addr = addr[:len(addr)-1] // Remove trailing comma
+ }
return addr
}
+// ContainsString checks if a string slice contains a specific string
func ContainsString(slice []string, str string) bool {
for _, item := range slice {
if item == str {
@@ -49,26 +54,31 @@ func ContainsString(slice []string, str string) bool {
return false
}
-func RemoveString(slice []string, str string) (result []string) {
+// RemoveString removes the first occurrence of a string from a slice
+func RemoveString(slice []string, str string) []string {
+ var result []string
for _, item := range slice {
- if item == str {
- continue
+ if item != str {
+ result = append(result, item)
}
- result = append(result, item)
}
return result
}
+// IsPVCOrphan checks if a PVC is orphaned (its ordinal >= replicas)
func IsPVCOrphan(pvcName string, replicas int32) bool {
- index := strings.LastIndexAny(pvcName, "-")
- if index == -1 {
+ // PVC names follow pattern: <name>-<ordinal>
+ lastDashIdx := strings.LastIndexAny(pvcName, "-")
+ if lastDashIdx == -1 {
return false
}
- ordinal, err := strconv.Atoi(pvcName[index+1:])
+ // Extract and parse the ordinal from PVC name
+ ordinal, err := strconv.Atoi(pvcName[lastDashIdx+1:])
if err != nil {
return false
}
+ // PVC is orphaned if its ordinal is >= current replica count
return int32(ordinal) >= replicas
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]