This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/shardingsphere-on-cloud.git
The following commit(s) were added to refs/heads/main by this push:
new ab2fe68 refactor: refactor reconcile for different workloads (#150)
ab2fe68 is described below
commit ab2fe68eff5ddad29aa3a65f944bc8dabe7d13e8
Author: liyao <[email protected]>
AuthorDate: Tue Dec 20 12:59:08 2022 +0800
refactor: refactor reconcile for different workloads (#150)
* chore: update gitignore
Signed-off-by: mlycore <[email protected]>
* refactor: update deployment reconcile
Signed-off-by: mlycore <[email protected]>
* refactor: remove diff
Signed-off-by: mlycore <[email protected]>
* refactor: add exp to UpdateService
Signed-off-by: mlycore <[email protected]>
* refactor: move proxy to near update
Signed-off-by: mlycore <[email protected]>
* fix: fix deployment resource requirement
Signed-off-by: mlycore <[email protected]>
* chore: fix unit test for UpdateDeployment
Signed-off-by: mlycore <[email protected]>
Signed-off-by: mlycore <[email protected]>
---
.gitignore | 2 +
.../pkg/controllers/proxy_controller.go | 78 ++++++++-----
.../pkg/reconcile/deployment.go | 130 +++++++++++++++++----
.../pkg/reconcile/reconcile_test.go | 18 +--
shardingsphere-operator/pkg/reconcile/service.go | 5 +-
5 files changed, 171 insertions(+), 62 deletions(-)
diff --git a/.gitignore b/.gitignore
index b2b4e7f..021b109 100644
--- a/.gitignore
+++ b/.gitignore
@@ -28,3 +28,5 @@ shardingsphere-operator/config/
charts/apache-shardingsphere-operator-cluster-charts/charts/
charts/apache-shardingsphere-operator-charts/charts/
**/Chart.lock
+test
+certs
diff --git a/shardingsphere-operator/pkg/controllers/proxy_controller.go
b/shardingsphere-operator/pkg/controllers/proxy_controller.go
index 85bf3c8..e582448 100644
--- a/shardingsphere-operator/pkg/controllers/proxy_controller.go
+++ b/shardingsphere-operator/pkg/controllers/proxy_controller.go
@@ -84,21 +84,21 @@ func (r *ProxyReconciler) getRuntimeShardingSphereProxy(ctx
context.Context, nam
func (r *ProxyReconciler) reconcile(ctx context.Context, req ctrl.Request, rt
*v1alpha1.ShardingSphereProxy) (ctrl.Result, error) {
log := logger.FromContext(ctx)
- if res, err := r.reconcileDeployment(ctx, req.NamespacedName, rt); err
!= nil {
+ if res, err := r.reconcileDeployment(ctx, req.NamespacedName); err !=
nil {
log.Error(err, "Error reconcile Deployment")
return res, err
}
- if res, err := r.reconcileService(ctx, req.NamespacedName, rt); err !=
nil {
+ if res, err := r.reconcileService(ctx, req.NamespacedName); err != nil {
log.Error(err, "Error reconcile Service")
return res, err
}
- if res, err := r.reconcilePodList(ctx, req.Namespace, req.Name, rt);
err != nil {
+ if res, err := r.reconcilePodList(ctx, req.Namespace, req.Name); err !=
nil {
log.Error(err, "Error reconcile Pod list")
return res, err
}
- if res, err := r.reconcileHPA(ctx, req.NamespacedName, rt); err != nil {
+ if res, err := r.reconcileHPA(ctx, req.NamespacedName); err != nil {
log.Error(err, "Error reconcile HPA")
return res, err
}
@@ -106,11 +106,15 @@ func (r *ProxyReconciler) reconcile(ctx context.Context,
req ctrl.Request, rt *v
return ctrl.Result{}, nil
}
-func (r *ProxyReconciler) reconcileDeployment(ctx context.Context,
namespacedName types.NamespacedName, ssproxy *v1alpha1.ShardingSphereProxy)
(ctrl.Result, error) {
+func (r *ProxyReconciler) reconcileDeployment(ctx context.Context,
namespacedName types.NamespacedName) (ctrl.Result, error) {
+ ssproxy, err := r.getRuntimeShardingSphereProxy(ctx, namespacedName)
+ if err != nil {
+ return ctrl.Result{}, err
+ }
+
deploy := &appsv1.Deployment{}
- var err error
- if err = r.Get(ctx, namespacedName, deploy); err != nil {
+ if err := r.Get(ctx, namespacedName, deploy); err != nil {
if !apierrors.IsNotFound(err) {
return ctrl.Result{}, err
} else {
@@ -123,13 +127,11 @@ func (r *ProxyReconciler) reconcileDeployment(ctx
context.Context, namespacedNam
}
} else {
act := deploy.DeepCopy()
- exp := reconcile.UpdateDeployment(ssproxy, act)
-
- //FIXME: using diff to trigger update
- // if reflect.DeepEqual(act.Spec.Template, exp.Spec.Template) {
- // return ctrl.Result{}, nil
- // }
+ exp := reconcile.UpdateDeployment(ssproxy, act)
+ if err != nil {
+ return ctrl.Result{}, err
+ }
if err := r.Update(ctx, exp); err != nil {
return ctrl.Result{Requeue: true}, err
}
@@ -137,11 +139,15 @@ func (r *ProxyReconciler) reconcileDeployment(ctx
context.Context, namespacedNam
return ctrl.Result{}, nil
}
-func (r *ProxyReconciler) reconcileHPA(ctx context.Context, namespacedName
types.NamespacedName, ssproxy *v1alpha1.ShardingSphereProxy) (ctrl.Result,
error) {
+func (r *ProxyReconciler) reconcileHPA(ctx context.Context, namespacedName
types.NamespacedName) (ctrl.Result, error) {
+ ssproxy, err := r.getRuntimeShardingSphereProxy(ctx, namespacedName)
+ if err != nil {
+ return ctrl.Result{}, err
+ }
+
hpa := &autoscalingv2beta2.HorizontalPodAutoscaler{}
- var err error
- if err = r.Get(ctx, namespacedName, hpa); err != nil {
+ if err := r.Get(ctx, namespacedName, hpa); err != nil {
if !apierrors.IsNotFound(err) {
return ctrl.Result{}, err
} else {
@@ -171,11 +177,15 @@ func (r *ProxyReconciler) reconcileHPA(ctx
context.Context, namespacedName types
return ctrl.Result{}, nil
}
-func (r *ProxyReconciler) reconcileService(ctx context.Context, namespacedName
types.NamespacedName, ssproxy *v1alpha1.ShardingSphereProxy) (ctrl.Result,
error) {
+func (r *ProxyReconciler) reconcileService(ctx context.Context, namespacedName
types.NamespacedName) (ctrl.Result, error) {
+ ssproxy, err := r.getRuntimeShardingSphereProxy(ctx, namespacedName)
+ if err != nil {
+ return ctrl.Result{}, err
+ }
+
service := &v1.Service{}
- var err error
- if err = r.Get(ctx, namespacedName, service); err != nil {
+ if err := r.Get(ctx, namespacedName, service); err != nil {
if !apierrors.IsNotFound(err) {
return ctrl.Result{}, err
} else {
@@ -190,8 +200,8 @@ func (r *ProxyReconciler) reconcileService(ctx
context.Context, namespacedName t
}
} else {
act := service.DeepCopy()
- reconcile.UpdateService(ssproxy, act)
- if err := r.Update(ctx, act); err != nil {
+ exp := reconcile.UpdateService(ssproxy, act)
+ if err := r.Update(ctx, exp); err != nil {
return ctrl.Result{}, err
}
}
@@ -199,7 +209,7 @@ func (r *ProxyReconciler) reconcileService(ctx
context.Context, namespacedName t
return ctrl.Result{}, nil
}
-func (r *ProxyReconciler) reconcilePodList(ctx context.Context, namespace,
name string, ssproxy *v1alpha1.ShardingSphereProxy) (ctrl.Result, error) {
+func (r *ProxyReconciler) reconcilePodList(ctx context.Context, namespace,
name string) (ctrl.Result, error) {
podList := &v1.PodList{}
if err := r.List(ctx, podList, client.InNamespace(namespace),
client.MatchingLabels(map[string]string{"apps": name})); err != nil {
return ctrl.Result{}, err
@@ -207,27 +217,35 @@ func (r *ProxyReconciler) reconcilePodList(ctx
context.Context, namespace, name
result := ctrl.Result{}
readyNodes := reconcile.CountingReadyPods(podList)
+
+ rt, err := r.getRuntimeShardingSphereProxy(ctx, types.NamespacedName{
+ Namespace: namespace,
+ Name: name,
+ })
+ if err != nil {
+ return ctrl.Result{}, err
+ }
if reconcile.IsRunning(podList) {
if readyNodes < miniReadyCount {
result.RequeueAfter = WaitingForReady
- if readyNodes != ssproxy.Status.ReadyNodes {
- ssproxy.SetPodStarted(readyNodes)
+ if readyNodes != rt.Status.ReadyNodes {
+ rt.SetPodStarted(readyNodes)
}
} else {
- if ssproxy.Status.Phase != v1alpha1.StatusReady {
- ssproxy.SetReady(readyNodes)
- } else if readyNodes != ssproxy.Spec.Replicas {
- ssproxy.UpdateReadyNodes(readyNodes)
+ if rt.Status.Phase != v1alpha1.StatusReady {
+ rt.SetReady(readyNodes)
+ } else if readyNodes != rt.Spec.Replicas {
+ rt.UpdateReadyNodes(readyNodes)
}
}
} else {
// TODO: Waiting for pods to start exceeds the maximum number
of retries
- ssproxy.SetPodNotStarted(readyNodes)
+ rt.SetPodNotStarted(readyNodes)
result.RequeueAfter = WaitingForReady
}
// TODO: Compare Status with or without modification
- if err := r.Status().Update(ctx, ssproxy); err != nil {
+ if err := r.Status().Update(ctx, rt); err != nil {
return result, err
}
diff --git a/shardingsphere-operator/pkg/reconcile/deployment.go
b/shardingsphere-operator/pkg/reconcile/deployment.go
index fb1bb0f..9734763 100644
--- a/shardingsphere-operator/pkg/reconcile/deployment.go
+++ b/shardingsphere-operator/pkg/reconcile/deployment.go
@@ -19,7 +19,6 @@ package reconcile
import (
"fmt"
- "html/template"
"reflect"
"strconv"
"strings"
@@ -130,8 +129,13 @@ func processOptionalParameter(proxy
*v1alpha1.ShardingSphereProxy, dp *v1.Deploy
return dp
}
-func addInitContainer(dp *v1.Deployment, mysql *v1alpha1.MySQLDriver) {
+const script = `wget
https://repo1.maven.org/maven2/mysql/mysql-connector-java/${VERSION}/mysql-connector-java-${VERSION}.jar;
+wget
https://repo1.maven.org/maven2/mysql/mysql-connector-java/${VERSION}/mysql-connector-java-${VERSION}.jar.md5;
+if [ $(md5sum /mysql-connector-java-${VERSION}.jar | cut -d ' ' -f1) = $(cat
/mysql-connector-java-${VERSION}.jar.md5) ];
+then echo success;
+else echo failed;exit 1;fi;mv /mysql-connector-java-${VERSION}.jar
/opt/shardingsphere-proxy/ext-lib`
+func addInitContainer(dp *v1.Deployment, mysql *v1alpha1.MySQLDriver) {
if len(dp.Spec.Template.Spec.InitContainers) == 0 {
dp.Spec.Template.Spec.Containers[0].VolumeMounts =
append(dp.Spec.Template.Spec.Containers[0].VolumeMounts, corev1.VolumeMount{
Name: "mysql-connect-jar",
@@ -146,18 +150,17 @@ func addInitContainer(dp *v1.Deployment, mysql
*v1alpha1.MySQLDriver) {
})
}
- scriptStr := strings.Builder{}
- t1, _ := template.New("shell").Parse(`wget
https://repo1.maven.org/maven2/mysql/mysql-connector-java/{{ .Version
}}/mysql-connector-java-{{ .Version }}.jar;
-wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/{{ .Version
}}/mysql-connector-java-{{ .Version }}.jar.md5;
-if [ $(md5sum /mysql-connector-java-{{ .Version }}.jar | cut -d ' ' -f1) =
$(cat /mysql-connector-java-{{ .Version }}.jar.md5) ];
-then echo success;
-else echo failed;exit 1;fi;mv /mysql-connector-java-{{ .Version }}.jar
/opt/shardingsphere-proxy/ext-lib`)
- _ = t1.Execute(&scriptStr, mysql)
dp.Spec.Template.Spec.InitContainers = []corev1.Container{
{
Name: "download-mysql-connect",
Image: "busybox:1.35.0",
- Command: []string{"/bin/sh", "-c", scriptStr.String()},
+ Command: []string{"/bin/sh", "-c", script},
+ Env: []corev1.EnvVar{
+ {
+ Name: "VERSION",
+ Value: mysql.Version,
+ },
+ },
VolumeMounts: []corev1.VolumeMount{
{
Name: "mysql-connect-jar",
@@ -171,25 +174,108 @@ else echo failed;exit 1;fi;mv /mysql-connector-java-{{
.Version }}.jar /opt/shar
// UpdateDeployment FIXME:merge UpdateDeployment and
ConstructCascadingDeployment
func UpdateDeployment(proxy *v1alpha1.ShardingSphereProxy, act *v1.Deployment)
*v1.Deployment {
+ exp := act.DeepCopy()
+
if proxy.Spec.AutomaticScaling == nil ||
!proxy.Spec.AutomaticScaling.Enable {
- act.Spec.Replicas = &proxy.Spec.Replicas
+ exp.Spec.Replicas = updateReplicas(proxy, act)
}
+ exp.Spec.Template = updatePodTemplateSpec(proxy, act)
+ return exp
+}
- act.Spec.Template.Spec.Containers[0].Image = fmt.Sprintf("%s:%s",
imageName, proxy.Spec.Version)
- act.Spec.Template.Spec.Containers[0].Env[0].Value =
strconv.FormatInt(int64(proxy.Spec.Port), 10)
- act.Spec.Template.Spec.Containers[0].Ports[0].ContainerPort =
proxy.Spec.Port
+func updateReplicas(proxy *v1alpha1.ShardingSphereProxy, act *v1.Deployment)
*int32 {
+ if *act.Spec.Replicas != proxy.Spec.Replicas {
+ return &proxy.Spec.Replicas
+ }
+ return act.Spec.Replicas
+}
- act.Spec.Template.Spec.Containers[0].Resources = proxy.Spec.Resources
- act.Spec.Template.Spec.Containers[0].LivenessProbe =
proxy.Spec.LivenessProbe
- act.Spec.Template.Spec.Containers[0].ReadinessProbe =
proxy.Spec.ReadinessProbe
- act.Spec.Template.Spec.Containers[0].StartupProbe =
proxy.Spec.StartupProbe
+func updatePodTemplateSpec(proxy *v1alpha1.ShardingSphereProxy, act
*v1.Deployment) corev1.PodTemplateSpec {
+ exp := act.Spec.Template.DeepCopy()
- act.Spec.Template.Spec.Volumes[0].ConfigMap.Name =
proxy.Spec.ProxyConfigName
+ SSProxyContainer := updateSSProxyContainer(proxy, act)
+ for i, _ := range exp.Spec.Containers {
+ if exp.Spec.Containers[i].Name == "proxy" {
+ exp.Spec.Containers[i] = *SSProxyContainer
+ }
+ }
- if proxy.Spec.MySQLDriver.Version != "" {
- addInitContainer(act, proxy.Spec.MySQLDriver)
+ initContainer := updateInitContainer(proxy, act)
+ for i, _ := range exp.Spec.InitContainers {
+ if exp.Spec.InitContainers[i].Name == "download-mysql-connect" {
+ exp.Spec.InitContainers[i] = *initContainer
+ }
}
- exp := act.DeepCopy()
+ configName := updateConfigName(proxy, act)
+ exp.Spec.Volumes[0].ConfigMap.Name = configName
+
+ return *exp
+}
+
+func updateConfigName(proxy *v1alpha1.ShardingSphereProxy, act *v1.Deployment)
string {
+ if act.Spec.Template.Spec.Volumes[0].ConfigMap.Name !=
proxy.Spec.ProxyConfigName {
+ return proxy.Spec.ProxyConfigName
+ }
+ return act.Spec.Template.Spec.Volumes[0].ConfigMap.Name
+}
+
+func updateInitContainer(proxy *v1alpha1.ShardingSphereProxy, act
*v1.Deployment) *corev1.Container {
+ var exp *corev1.Container
+
+ for _, c := range act.Spec.Template.Spec.InitContainers {
+ if c.Name == "download-mysql-connect" {
+ for i, _ := range c.Env {
+ if c.Env[i].Name == "VERSION" {
+ if c.Env[i].Value !=
proxy.Spec.MySQLDriver.Version {
+ c.Env[i].Value =
proxy.Spec.MySQLDriver.Version
+ }
+ }
+ }
+ exp = c.DeepCopy()
+ }
+ }
+
+ return exp
+}
+
+func updateSSProxyContainer(proxy *v1alpha1.ShardingSphereProxy, act
*v1.Deployment) *corev1.Container {
+ var exp *corev1.Container
+
+ for _, c := range act.Spec.Template.Spec.Containers {
+ if c.Name == "proxy" {
+ exp = c.DeepCopy()
+
+ tag := strings.Split(c.Image, ":")[1]
+ if tag != proxy.Spec.Version {
+ exp.Image = fmt.Sprintf("%s:%s", imageName,
proxy.Spec.Version)
+ }
+
+ exp.Resources = proxy.Spec.Resources
+
+ if proxy.Spec.LivenessProbe != nil &&
!reflect.DeepEqual(c.LivenessProbe, *proxy.Spec.LivenessProbe) {
+ exp.LivenessProbe = proxy.Spec.LivenessProbe
+ }
+
+ if proxy.Spec.ReadinessProbe != nil &&
!reflect.DeepEqual(c.ReadinessProbe, *proxy.Spec.ReadinessProbe) {
+ exp.ReadinessProbe = proxy.Spec.ReadinessProbe
+ }
+
+ if proxy.Spec.StartupProbe != nil &&
!reflect.DeepEqual(c.StartupProbe, *proxy.Spec.StartupProbe) {
+ exp.StartupProbe = proxy.Spec.StartupProbe
+ }
+
+ for i, _ := range c.Env {
+ if c.Env[i].Name == "PORT" {
+ proxyPort :=
strconv.FormatInt(int64(proxy.Spec.Port), 10)
+ if c.Env[i].Value != proxyPort {
+ c.Env[i].Value = proxyPort
+ exp.Ports[0].ContainerPort =
proxy.Spec.Port
+ }
+ }
+ }
+ exp.Env = c.Env
+ }
+ }
return exp
}
diff --git a/shardingsphere-operator/pkg/reconcile/reconcile_test.go
b/shardingsphere-operator/pkg/reconcile/reconcile_test.go
index 9a12ef4..d7580c1 100644
--- a/shardingsphere-operator/pkg/reconcile/reconcile_test.go
+++ b/shardingsphere-operator/pkg/reconcile/reconcile_test.go
@@ -860,15 +860,15 @@ func Test_UpdateDeployment(t *testing.T) {
}
for _, c := range cases {
- UpdateDeployment(c.proxy, c.deploy)
- assert.Equal(t, fmt.Sprintf("%s:%s", imageName,
c.proxy.Spec.Version), c.deploy.Spec.Template.Spec.Containers[0].Image,
c.message)
- assert.Equal(t, c.proxy.Spec.Replicas, *c.deploy.Spec.Replicas,
c.message)
- assert.Equal(t, c.proxy.Spec.ProxyConfigName,
c.deploy.Spec.Template.Spec.Volumes[0].ConfigMap.Name, c.message)
- assert.Equal(t, c.proxy.Spec.Port,
c.deploy.Spec.Template.Spec.Containers[0].Ports[0].ContainerPort, c.message)
- assert.EqualValues(t, c.proxy.Spec.Resources,
c.deploy.Spec.Template.Spec.Containers[0].Resources, c.message)
- assert.EqualValues(t, c.proxy.Spec.LivenessProbe,
c.deploy.Spec.Template.Spec.Containers[0].LivenessProbe, c.message)
- assert.EqualValues(t, c.proxy.Spec.ReadinessProbe,
c.deploy.Spec.Template.Spec.Containers[0].ReadinessProbe, c.message)
- assert.EqualValues(t, c.proxy.Spec.StartupProbe,
c.deploy.Spec.Template.Spec.Containers[0].StartupProbe, c.message)
+ exp := UpdateDeployment(c.proxy, c.deploy)
+ assert.Equal(t, fmt.Sprintf("%s:%s", imageName,
c.proxy.Spec.Version), exp.Spec.Template.Spec.Containers[0].Image, c.message)
+ assert.Equal(t, c.proxy.Spec.Replicas, *exp.Spec.Replicas,
c.message)
+ assert.Equal(t, c.proxy.Spec.ProxyConfigName,
exp.Spec.Template.Spec.Volumes[0].ConfigMap.Name, c.message)
+ assert.Equal(t, c.proxy.Spec.Port,
exp.Spec.Template.Spec.Containers[0].Ports[0].ContainerPort, c.message)
+ assert.EqualValues(t, c.proxy.Spec.Resources,
exp.Spec.Template.Spec.Containers[0].Resources, c.message)
+ assert.EqualValues(t, c.proxy.Spec.LivenessProbe,
exp.Spec.Template.Spec.Containers[0].LivenessProbe, c.message)
+ assert.EqualValues(t, c.proxy.Spec.ReadinessProbe,
exp.Spec.Template.Spec.Containers[0].ReadinessProbe, c.message)
+ assert.EqualValues(t, c.proxy.Spec.StartupProbe,
exp.Spec.Template.Spec.Containers[0].StartupProbe, c.message)
}
}
diff --git a/shardingsphere-operator/pkg/reconcile/service.go
b/shardingsphere-operator/pkg/reconcile/service.go
index 3fdf693..65cefc2 100644
--- a/shardingsphere-operator/pkg/reconcile/service.go
+++ b/shardingsphere-operator/pkg/reconcile/service.go
@@ -62,11 +62,14 @@ func ConstructCascadingService(proxy
*v1alpha1.ShardingSphereProxy) *v1.Service
return &svc
}
-func UpdateService(proxy *v1alpha1.ShardingSphereProxy, runtimeService
*v1.Service) {
+func UpdateService(proxy *v1alpha1.ShardingSphereProxy, runtimeService
*v1.Service) *v1.Service {
+ exp := &v1.Service{}
runtimeService.Spec.Type = proxy.Spec.ServiceType.Type
runtimeService.Spec.Ports[0].Port = proxy.Spec.Port
runtimeService.Spec.Ports[0].TargetPort = fromInt32(proxy.Spec.Port)
if proxy.Spec.ServiceType.NodePort != 0 {
runtimeService.Spec.Ports[0].NodePort =
proxy.Spec.ServiceType.NodePort
}
+ exp = runtimeService.DeepCopy()
+ return exp
}