This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/shardingsphere-on-cloud.git


The following commit(s) were added to refs/heads/main by this push:
     new ab2fe68  refactor: refactor reconcile for different workloads (#150)
ab2fe68 is described below

commit ab2fe68eff5ddad29aa3a65f944bc8dabe7d13e8
Author: liyao <[email protected]>
AuthorDate: Tue Dec 20 12:59:08 2022 +0800

    refactor: refactor reconcile for different workloads (#150)
    
    * chore: update gitignore
    
    Signed-off-by: mlycore <[email protected]>
    
    * refactor: update deployment reconcile
    
    Signed-off-by: mlycore <[email protected]>
    
    * refactor: remove diff
    
    Signed-off-by: mlycore <[email protected]>
    
    * refactor: add exp to UpdateService
    
    Signed-off-by: mlycore <[email protected]>
    
    * refactor: move proxy to near update
    
    Signed-off-by: mlycore <[email protected]>
    
    * fix: fix deployment resource requirement
    
    Signed-off-by: mlycore <[email protected]>
    
    * chore: fix unit test for UpdateDeployment
    
    Signed-off-by: mlycore <[email protected]>
    
    Signed-off-by: mlycore <[email protected]>
---
 .gitignore                                         |   2 +
 .../pkg/controllers/proxy_controller.go            |  78 ++++++++-----
 .../pkg/reconcile/deployment.go                    | 130 +++++++++++++++++----
 .../pkg/reconcile/reconcile_test.go                |  18 +--
 shardingsphere-operator/pkg/reconcile/service.go   |   5 +-
 5 files changed, 171 insertions(+), 62 deletions(-)

diff --git a/.gitignore b/.gitignore
index b2b4e7f..021b109 100644
--- a/.gitignore
+++ b/.gitignore
@@ -28,3 +28,5 @@ shardingsphere-operator/config/
 charts/apache-shardingsphere-operator-cluster-charts/charts/
 charts/apache-shardingsphere-operator-charts/charts/
 **/Chart.lock
+test
+certs
diff --git a/shardingsphere-operator/pkg/controllers/proxy_controller.go 
b/shardingsphere-operator/pkg/controllers/proxy_controller.go
index 85bf3c8..e582448 100644
--- a/shardingsphere-operator/pkg/controllers/proxy_controller.go
+++ b/shardingsphere-operator/pkg/controllers/proxy_controller.go
@@ -84,21 +84,21 @@ func (r *ProxyReconciler) getRuntimeShardingSphereProxy(ctx 
context.Context, nam
 
 func (r *ProxyReconciler) reconcile(ctx context.Context, req ctrl.Request, rt 
*v1alpha1.ShardingSphereProxy) (ctrl.Result, error) {
        log := logger.FromContext(ctx)
-       if res, err := r.reconcileDeployment(ctx, req.NamespacedName, rt); err 
!= nil {
+       if res, err := r.reconcileDeployment(ctx, req.NamespacedName); err != 
nil {
                log.Error(err, "Error reconcile Deployment")
                return res, err
        }
 
-       if res, err := r.reconcileService(ctx, req.NamespacedName, rt); err != 
nil {
+       if res, err := r.reconcileService(ctx, req.NamespacedName); err != nil {
                log.Error(err, "Error reconcile Service")
                return res, err
        }
-       if res, err := r.reconcilePodList(ctx, req.Namespace, req.Name, rt); 
err != nil {
+       if res, err := r.reconcilePodList(ctx, req.Namespace, req.Name); err != 
nil {
                log.Error(err, "Error reconcile Pod list")
                return res, err
        }
 
-       if res, err := r.reconcileHPA(ctx, req.NamespacedName, rt); err != nil {
+       if res, err := r.reconcileHPA(ctx, req.NamespacedName); err != nil {
                log.Error(err, "Error reconcile HPA")
                return res, err
        }
@@ -106,11 +106,15 @@ func (r *ProxyReconciler) reconcile(ctx context.Context, 
req ctrl.Request, rt *v
        return ctrl.Result{}, nil
 }
 
-func (r *ProxyReconciler) reconcileDeployment(ctx context.Context, 
namespacedName types.NamespacedName, ssproxy *v1alpha1.ShardingSphereProxy) 
(ctrl.Result, error) {
+func (r *ProxyReconciler) reconcileDeployment(ctx context.Context, 
namespacedName types.NamespacedName) (ctrl.Result, error) {
+       ssproxy, err := r.getRuntimeShardingSphereProxy(ctx, namespacedName)
+       if err != nil {
+               return ctrl.Result{}, err
+       }
+
        deploy := &appsv1.Deployment{}
 
-       var err error
-       if err = r.Get(ctx, namespacedName, deploy); err != nil {
+       if err := r.Get(ctx, namespacedName, deploy); err != nil {
                if !apierrors.IsNotFound(err) {
                        return ctrl.Result{}, err
                } else {
@@ -123,13 +127,11 @@ func (r *ProxyReconciler) reconcileDeployment(ctx 
context.Context, namespacedNam
                }
        } else {
                act := deploy.DeepCopy()
-               exp := reconcile.UpdateDeployment(ssproxy, act)
-
-               //FIXME: using diff to trigger update
-               // if reflect.DeepEqual(act.Spec.Template, exp.Spec.Template) {
-               //      return ctrl.Result{}, nil
-               // }
 
+               exp := reconcile.UpdateDeployment(ssproxy, act)
+               if err != nil {
+                       return ctrl.Result{}, err
+               }
                if err := r.Update(ctx, exp); err != nil {
                        return ctrl.Result{Requeue: true}, err
                }
@@ -137,11 +139,15 @@ func (r *ProxyReconciler) reconcileDeployment(ctx 
context.Context, namespacedNam
        return ctrl.Result{}, nil
 }
 
-func (r *ProxyReconciler) reconcileHPA(ctx context.Context, namespacedName 
types.NamespacedName, ssproxy *v1alpha1.ShardingSphereProxy) (ctrl.Result, 
error) {
+func (r *ProxyReconciler) reconcileHPA(ctx context.Context, namespacedName 
types.NamespacedName) (ctrl.Result, error) {
+       ssproxy, err := r.getRuntimeShardingSphereProxy(ctx, namespacedName)
+       if err != nil {
+               return ctrl.Result{}, err
+       }
+
        hpa := &autoscalingv2beta2.HorizontalPodAutoscaler{}
 
-       var err error
-       if err = r.Get(ctx, namespacedName, hpa); err != nil {
+       if err := r.Get(ctx, namespacedName, hpa); err != nil {
                if !apierrors.IsNotFound(err) {
                        return ctrl.Result{}, err
                } else {
@@ -171,11 +177,15 @@ func (r *ProxyReconciler) reconcileHPA(ctx 
context.Context, namespacedName types
        return ctrl.Result{}, nil
 }
 
-func (r *ProxyReconciler) reconcileService(ctx context.Context, namespacedName 
types.NamespacedName, ssproxy *v1alpha1.ShardingSphereProxy) (ctrl.Result, 
error) {
+func (r *ProxyReconciler) reconcileService(ctx context.Context, namespacedName 
types.NamespacedName) (ctrl.Result, error) {
+       ssproxy, err := r.getRuntimeShardingSphereProxy(ctx, namespacedName)
+       if err != nil {
+               return ctrl.Result{}, err
+       }
+
        service := &v1.Service{}
 
-       var err error
-       if err = r.Get(ctx, namespacedName, service); err != nil {
+       if err := r.Get(ctx, namespacedName, service); err != nil {
                if !apierrors.IsNotFound(err) {
                        return ctrl.Result{}, err
                } else {
@@ -190,8 +200,8 @@ func (r *ProxyReconciler) reconcileService(ctx 
context.Context, namespacedName t
                }
        } else {
                act := service.DeepCopy()
-               reconcile.UpdateService(ssproxy, act)
-               if err := r.Update(ctx, act); err != nil {
+               exp := reconcile.UpdateService(ssproxy, act)
+               if err := r.Update(ctx, exp); err != nil {
                        return ctrl.Result{}, err
                }
        }
@@ -199,7 +209,7 @@ func (r *ProxyReconciler) reconcileService(ctx 
context.Context, namespacedName t
        return ctrl.Result{}, nil
 }
 
-func (r *ProxyReconciler) reconcilePodList(ctx context.Context, namespace, 
name string, ssproxy *v1alpha1.ShardingSphereProxy) (ctrl.Result, error) {
+func (r *ProxyReconciler) reconcilePodList(ctx context.Context, namespace, 
name string) (ctrl.Result, error) {
        podList := &v1.PodList{}
        if err := r.List(ctx, podList, client.InNamespace(namespace), 
client.MatchingLabels(map[string]string{"apps": name})); err != nil {
                return ctrl.Result{}, err
@@ -207,27 +217,35 @@ func (r *ProxyReconciler) reconcilePodList(ctx 
context.Context, namespace, name
 
        result := ctrl.Result{}
        readyNodes := reconcile.CountingReadyPods(podList)
+
+       rt, err := r.getRuntimeShardingSphereProxy(ctx, types.NamespacedName{
+               Namespace: namespace,
+               Name:      name,
+       })
+       if err != nil {
+               return ctrl.Result{}, err
+       }
        if reconcile.IsRunning(podList) {
                if readyNodes < miniReadyCount {
                        result.RequeueAfter = WaitingForReady
-                       if readyNodes != ssproxy.Status.ReadyNodes {
-                               ssproxy.SetPodStarted(readyNodes)
+                       if readyNodes != rt.Status.ReadyNodes {
+                               rt.SetPodStarted(readyNodes)
                        }
                } else {
-                       if ssproxy.Status.Phase != v1alpha1.StatusReady {
-                               ssproxy.SetReady(readyNodes)
-                       } else if readyNodes != ssproxy.Spec.Replicas {
-                               ssproxy.UpdateReadyNodes(readyNodes)
+                       if rt.Status.Phase != v1alpha1.StatusReady {
+                               rt.SetReady(readyNodes)
+                       } else if readyNodes != rt.Spec.Replicas {
+                               rt.UpdateReadyNodes(readyNodes)
                        }
                }
        } else {
                // TODO: Waiting for pods to start exceeds the maximum number 
of retries
-               ssproxy.SetPodNotStarted(readyNodes)
+               rt.SetPodNotStarted(readyNodes)
                result.RequeueAfter = WaitingForReady
        }
 
        // TODO: Compare Status with or without modification
-       if err := r.Status().Update(ctx, ssproxy); err != nil {
+       if err := r.Status().Update(ctx, rt); err != nil {
                return result, err
        }
 
diff --git a/shardingsphere-operator/pkg/reconcile/deployment.go 
b/shardingsphere-operator/pkg/reconcile/deployment.go
index fb1bb0f..9734763 100644
--- a/shardingsphere-operator/pkg/reconcile/deployment.go
+++ b/shardingsphere-operator/pkg/reconcile/deployment.go
@@ -19,7 +19,6 @@ package reconcile
 
 import (
        "fmt"
-       "html/template"
        "reflect"
        "strconv"
        "strings"
@@ -130,8 +129,13 @@ func processOptionalParameter(proxy 
*v1alpha1.ShardingSphereProxy, dp *v1.Deploy
        return dp
 }
 
-func addInitContainer(dp *v1.Deployment, mysql *v1alpha1.MySQLDriver) {
+const script = `wget 
https://repo1.maven.org/maven2/mysql/mysql-connector-java/${VERSION}/mysql-connector-java-${VERSION}.jar;
+wget 
https://repo1.maven.org/maven2/mysql/mysql-connector-java/${VERSION}/mysql-connector-java-${VERSION}.jar.md5;
+if [ $(md5sum /mysql-connector-java-${VERSION}.jar | cut -d ' ' -f1) = $(cat 
/mysql-connector-java-${VERSION}.jar.md5) ];
+then echo success;
+else echo failed;exit 1;fi;mv /mysql-connector-java-${VERSION}.jar 
/opt/shardingsphere-proxy/ext-lib`
 
+func addInitContainer(dp *v1.Deployment, mysql *v1alpha1.MySQLDriver) {
        if len(dp.Spec.Template.Spec.InitContainers) == 0 {
                dp.Spec.Template.Spec.Containers[0].VolumeMounts = 
append(dp.Spec.Template.Spec.Containers[0].VolumeMounts, corev1.VolumeMount{
                        Name:      "mysql-connect-jar",
@@ -146,18 +150,17 @@ func addInitContainer(dp *v1.Deployment, mysql 
*v1alpha1.MySQLDriver) {
                })
        }
 
-       scriptStr := strings.Builder{}
-       t1, _ := template.New("shell").Parse(`wget 
https://repo1.maven.org/maven2/mysql/mysql-connector-java/{{ .Version 
}}/mysql-connector-java-{{ .Version }}.jar;
-wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/{{ .Version 
}}/mysql-connector-java-{{ .Version }}.jar.md5;
-if [ $(md5sum /mysql-connector-java-{{ .Version }}.jar | cut -d ' ' -f1) = 
$(cat /mysql-connector-java-{{ .Version }}.jar.md5) ];
-then echo success;
-else echo failed;exit 1;fi;mv /mysql-connector-java-{{ .Version }}.jar 
/opt/shardingsphere-proxy/ext-lib`)
-       _ = t1.Execute(&scriptStr, mysql)
        dp.Spec.Template.Spec.InitContainers = []corev1.Container{
                {
                        Name:    "download-mysql-connect",
                        Image:   "busybox:1.35.0",
-                       Command: []string{"/bin/sh", "-c", scriptStr.String()},
+                       Command: []string{"/bin/sh", "-c", script},
+                       Env: []corev1.EnvVar{
+                               {
+                                       Name:  "VERSION",
+                                       Value: mysql.Version,
+                               },
+                       },
                        VolumeMounts: []corev1.VolumeMount{
                                {
                                        Name:      "mysql-connect-jar",
@@ -171,25 +174,108 @@ else echo failed;exit 1;fi;mv /mysql-connector-java-{{ 
.Version }}.jar /opt/shar
 
 // UpdateDeployment FIXME:merge UpdateDeployment and 
ConstructCascadingDeployment
 func UpdateDeployment(proxy *v1alpha1.ShardingSphereProxy, act *v1.Deployment) 
*v1.Deployment {
+       exp := act.DeepCopy()
+
        if proxy.Spec.AutomaticScaling == nil || 
!proxy.Spec.AutomaticScaling.Enable {
-               act.Spec.Replicas = &proxy.Spec.Replicas
+               exp.Spec.Replicas = updateReplicas(proxy, act)
        }
+       exp.Spec.Template = updatePodTemplateSpec(proxy, act)
+       return exp
+}
 
-       act.Spec.Template.Spec.Containers[0].Image = fmt.Sprintf("%s:%s", 
imageName, proxy.Spec.Version)
-       act.Spec.Template.Spec.Containers[0].Env[0].Value = 
strconv.FormatInt(int64(proxy.Spec.Port), 10)
-       act.Spec.Template.Spec.Containers[0].Ports[0].ContainerPort = 
proxy.Spec.Port
+func updateReplicas(proxy *v1alpha1.ShardingSphereProxy, act *v1.Deployment) 
*int32 {
+       if *act.Spec.Replicas != proxy.Spec.Replicas {
+               return &proxy.Spec.Replicas
+       }
+       return act.Spec.Replicas
+}
 
-       act.Spec.Template.Spec.Containers[0].Resources = proxy.Spec.Resources
-       act.Spec.Template.Spec.Containers[0].LivenessProbe = 
proxy.Spec.LivenessProbe
-       act.Spec.Template.Spec.Containers[0].ReadinessProbe = 
proxy.Spec.ReadinessProbe
-       act.Spec.Template.Spec.Containers[0].StartupProbe = 
proxy.Spec.StartupProbe
+func updatePodTemplateSpec(proxy *v1alpha1.ShardingSphereProxy, act 
*v1.Deployment) corev1.PodTemplateSpec {
+       exp := act.Spec.Template.DeepCopy()
 
-       act.Spec.Template.Spec.Volumes[0].ConfigMap.Name = 
proxy.Spec.ProxyConfigName
+       SSProxyContainer := updateSSProxyContainer(proxy, act)
+       for i, _ := range exp.Spec.Containers {
+               if exp.Spec.Containers[i].Name == "proxy" {
+                       exp.Spec.Containers[i] = *SSProxyContainer
+               }
+       }
 
-       if proxy.Spec.MySQLDriver.Version != "" {
-               addInitContainer(act, proxy.Spec.MySQLDriver)
+       initContainer := updateInitContainer(proxy, act)
+       for i, _ := range exp.Spec.InitContainers {
+               if exp.Spec.InitContainers[i].Name == "download-mysql-connect" {
+                       exp.Spec.InitContainers[i] = *initContainer
+               }
        }
 
-       exp := act.DeepCopy()
+       configName := updateConfigName(proxy, act)
+       exp.Spec.Volumes[0].ConfigMap.Name = configName
+
+       return *exp
+}
+
+func updateConfigName(proxy *v1alpha1.ShardingSphereProxy, act *v1.Deployment) 
string {
+       if act.Spec.Template.Spec.Volumes[0].ConfigMap.Name != 
proxy.Spec.ProxyConfigName {
+               return proxy.Spec.ProxyConfigName
+       }
+       return act.Spec.Template.Spec.Volumes[0].ConfigMap.Name
+}
+
+func updateInitContainer(proxy *v1alpha1.ShardingSphereProxy, act 
*v1.Deployment) *corev1.Container {
+       var exp *corev1.Container
+
+       for _, c := range act.Spec.Template.Spec.InitContainers {
+               if c.Name == "download-mysql-connect" {
+                       for i, _ := range c.Env {
+                               if c.Env[i].Name == "VERSION" {
+                                       if c.Env[i].Value != 
proxy.Spec.MySQLDriver.Version {
+                                               c.Env[i].Value = 
proxy.Spec.MySQLDriver.Version
+                                       }
+                               }
+                       }
+                       exp = c.DeepCopy()
+               }
+       }
+
+       return exp
+}
+
+func updateSSProxyContainer(proxy *v1alpha1.ShardingSphereProxy, act 
*v1.Deployment) *corev1.Container {
+       var exp *corev1.Container
+
+       for _, c := range act.Spec.Template.Spec.Containers {
+               if c.Name == "proxy" {
+                       exp = c.DeepCopy()
+
+                       tag := strings.Split(c.Image, ":")[1]
+                       if tag != proxy.Spec.Version {
+                               exp.Image = fmt.Sprintf("%s:%s", imageName, 
proxy.Spec.Version)
+                       }
+
+                       exp.Resources = proxy.Spec.Resources
+
+                       if proxy.Spec.LivenessProbe != nil && 
!reflect.DeepEqual(c.LivenessProbe, *proxy.Spec.LivenessProbe) {
+                               exp.LivenessProbe = proxy.Spec.LivenessProbe
+                       }
+
+                       if proxy.Spec.ReadinessProbe != nil && 
!reflect.DeepEqual(c.ReadinessProbe, *proxy.Spec.ReadinessProbe) {
+                               exp.ReadinessProbe = proxy.Spec.ReadinessProbe
+                       }
+
+                       if proxy.Spec.StartupProbe != nil && 
!reflect.DeepEqual(c.StartupProbe, *proxy.Spec.StartupProbe) {
+                               exp.StartupProbe = proxy.Spec.StartupProbe
+                       }
+
+                       for i, _ := range c.Env {
+                               if c.Env[i].Name == "PORT" {
+                                       proxyPort := 
strconv.FormatInt(int64(proxy.Spec.Port), 10)
+                                       if c.Env[i].Value != proxyPort {
+                                               c.Env[i].Value = proxyPort
+                                               exp.Ports[0].ContainerPort = 
proxy.Spec.Port
+                                       }
+                               }
+                       }
+                       exp.Env = c.Env
+               }
+       }
        return exp
 }
diff --git a/shardingsphere-operator/pkg/reconcile/reconcile_test.go 
b/shardingsphere-operator/pkg/reconcile/reconcile_test.go
index 9a12ef4..d7580c1 100644
--- a/shardingsphere-operator/pkg/reconcile/reconcile_test.go
+++ b/shardingsphere-operator/pkg/reconcile/reconcile_test.go
@@ -860,15 +860,15 @@ func Test_UpdateDeployment(t *testing.T) {
        }
 
        for _, c := range cases {
-               UpdateDeployment(c.proxy, c.deploy)
-               assert.Equal(t, fmt.Sprintf("%s:%s", imageName, 
c.proxy.Spec.Version), c.deploy.Spec.Template.Spec.Containers[0].Image, 
c.message)
-               assert.Equal(t, c.proxy.Spec.Replicas, *c.deploy.Spec.Replicas, 
c.message)
-               assert.Equal(t, c.proxy.Spec.ProxyConfigName, 
c.deploy.Spec.Template.Spec.Volumes[0].ConfigMap.Name, c.message)
-               assert.Equal(t, c.proxy.Spec.Port, 
c.deploy.Spec.Template.Spec.Containers[0].Ports[0].ContainerPort, c.message)
-               assert.EqualValues(t, c.proxy.Spec.Resources, 
c.deploy.Spec.Template.Spec.Containers[0].Resources, c.message)
-               assert.EqualValues(t, c.proxy.Spec.LivenessProbe, 
c.deploy.Spec.Template.Spec.Containers[0].LivenessProbe, c.message)
-               assert.EqualValues(t, c.proxy.Spec.ReadinessProbe, 
c.deploy.Spec.Template.Spec.Containers[0].ReadinessProbe, c.message)
-               assert.EqualValues(t, c.proxy.Spec.StartupProbe, 
c.deploy.Spec.Template.Spec.Containers[0].StartupProbe, c.message)
+               exp := UpdateDeployment(c.proxy, c.deploy)
+               assert.Equal(t, fmt.Sprintf("%s:%s", imageName, 
c.proxy.Spec.Version), exp.Spec.Template.Spec.Containers[0].Image, c.message)
+               assert.Equal(t, c.proxy.Spec.Replicas, *exp.Spec.Replicas, 
c.message)
+               assert.Equal(t, c.proxy.Spec.ProxyConfigName, 
exp.Spec.Template.Spec.Volumes[0].ConfigMap.Name, c.message)
+               assert.Equal(t, c.proxy.Spec.Port, 
exp.Spec.Template.Spec.Containers[0].Ports[0].ContainerPort, c.message)
+               assert.EqualValues(t, c.proxy.Spec.Resources, 
exp.Spec.Template.Spec.Containers[0].Resources, c.message)
+               assert.EqualValues(t, c.proxy.Spec.LivenessProbe, 
exp.Spec.Template.Spec.Containers[0].LivenessProbe, c.message)
+               assert.EqualValues(t, c.proxy.Spec.ReadinessProbe, 
exp.Spec.Template.Spec.Containers[0].ReadinessProbe, c.message)
+               assert.EqualValues(t, c.proxy.Spec.StartupProbe, 
exp.Spec.Template.Spec.Containers[0].StartupProbe, c.message)
        }
 }
 
diff --git a/shardingsphere-operator/pkg/reconcile/service.go 
b/shardingsphere-operator/pkg/reconcile/service.go
index 3fdf693..65cefc2 100644
--- a/shardingsphere-operator/pkg/reconcile/service.go
+++ b/shardingsphere-operator/pkg/reconcile/service.go
@@ -62,11 +62,14 @@ func ConstructCascadingService(proxy 
*v1alpha1.ShardingSphereProxy) *v1.Service
        return &svc
 }
 
-func UpdateService(proxy *v1alpha1.ShardingSphereProxy, runtimeService 
*v1.Service) {
+func UpdateService(proxy *v1alpha1.ShardingSphereProxy, runtimeService 
*v1.Service) *v1.Service {
+       exp := &v1.Service{}
        runtimeService.Spec.Type = proxy.Spec.ServiceType.Type
        runtimeService.Spec.Ports[0].Port = proxy.Spec.Port
        runtimeService.Spec.Ports[0].TargetPort = fromInt32(proxy.Spec.Port)
        if proxy.Spec.ServiceType.NodePort != 0 {
                runtimeService.Spec.Ports[0].NodePort = 
proxy.Spec.ServiceType.NodePort
        }
+       exp = runtimeService.DeepCopy()
+       return exp
 }

Reply via email to