This is an automated email from the ASF dual-hosted git repository.

miaoliyao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/shardingsphere-on-cloud.git


The following commit(s) were added to refs/heads/main by this push:
     new 4c825d6  add event and status.phase to control resource create check 
(#312)
4c825d6 is described below

commit 4c825d6f85c57672fc59f013c6c164a64703059d
Author: moomman <[email protected]>
AuthorDate: Mon Apr 17 17:51:26 2023 +0800

    add event and status.phase to control resource create check (#312)
    
    * add phase status, controller chaos
    
    * fix phase logic,handle job and chaos create
    
    * fix phase logic,handle job and chaos create
    
    * fix phase logic,handle job and chaos create
    
    ---------
    
    Co-authored-by: moonman <[email protected]>
---
 .../api/v1alpha1/shardingsphere_chaos_types.go     |  28 ++--
 shardingsphere-operator/build/tools/Dockerfile     |   6 +-
 .../cmd/shardingsphere-operator/manager/option.go  |   1 +
 .../controllers/shardingsphere_chaos_controller.go | 175 +++++++++++++++++----
 .../reconcile/shardingspherechaos/chaos-mesh.go    |  14 +-
 .../pkg/reconcile/shardingspherechaos/job.go       |   9 +-
 6 files changed, 169 insertions(+), 64 deletions(-)

diff --git a/shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go 
b/shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go
index 5b99a64..32c2126 100644
--- a/shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go
+++ b/shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go
@@ -44,8 +44,6 @@ type ShardingSphereChaos struct {
 type ShardingSphereChaosSpec struct {
        InjectJob  JobSpec `json:"injectJob,omitempty"`
        EmbedChaos `json:",inline"`
-       //todo
-       //Verify batchV1Beta1.JobTemplateSpec `json:"Verify,omitempty"`
 }
 
 // JobSpec Specifies the config of job to create
@@ -69,32 +67,28 @@ type EmbedChaos struct {
 type ChaosCondition string
 
 const (
-       Creating     ChaosCondition = "Creating"
        AllRecovered ChaosCondition = "AllRecovered"
        Paused       ChaosCondition = "Paused"
        AllInjected  ChaosCondition = "AllInjected"
        NoTarget     ChaosCondition = "NoTarget"
-       UnKnown      ChaosCondition = "UnKnown"
-)
-
-// Jobschedule Show current job progress
-type Jobschedule string
-
-const (
-       JobCreating Jobschedule = "JobCreating"
-       JobFailed   Jobschedule = "JobFailed"
-       JobFinish   Jobschedule = "JobFinish"
+       Unknown      ChaosCondition = "Unknown"
 )
 
 // ShardingSphereChaosStatus defines the actual state of ShardingSphereChaos
 type ShardingSphereChaosStatus struct {
        ChaosCondition ChaosCondition `json:"chaosCondition"`
-       //todo
-       //InjectStatus   Jobschedule         `json:"InjectStatus"`
-       //todo
-       //VerifyStatus   Jobschedule         `json:"VerifyStatus"`
+       Phase          Phase          `json:"phase"`
 }
 
+type Phase string
+
+var (
+       PhaseBeforeExperiment Phase = "BeforeReq"
+       PhaseAfterExperiment  Phase = "AfterReq"
+       PhaseInChaos          Phase = "Injected"
+       PhaseRecoveredChaos   Phase = "Recovered"
+)
+
 // PodChaosAction Specify the action type of pod Chaos
 type PodChaosAction string
 
diff --git a/shardingsphere-operator/build/tools/Dockerfile 
b/shardingsphere-operator/build/tools/Dockerfile
index 17f3e4d..d73511b 100644
--- a/shardingsphere-operator/build/tools/Dockerfile
+++ b/shardingsphere-operator/build/tools/Dockerfile
@@ -25,7 +25,6 @@ ENV ZOOKEEPER_DIR /app/zookeeper
 WORKDIR /app
 RUN mkdir -p "/app/start" && chmod -R 777 /app/start
 CMD ["tail -f /dev/null"]
-ENTRYPOINT ["sh","-c"]
 RUN set -eux; \
         \
         apt-get update; \
@@ -43,7 +42,8 @@ RUN set -eux; \
         wget -O zookeeper.tar.gz "${ZOOKEEPER_DOWNLOAD_URL}"; \
         mkdir -p ${ZOOKEEPER_DIR}; \
         tar -zxf zookeeper.tar.gz -C ${ZOOKEEPER_DIR} --strip-components 1; \
-        rm zookeeper.tar.gz; \
-        \
+        rm zookeeper.tar.gz;
+
+ENTRYPOINT ["sh","-c"]
 
 
diff --git 
a/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go 
b/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go
index 53f4d16..10ff9f5 100644
--- a/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go
+++ b/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go
@@ -146,6 +146,7 @@ var featureGatesHandlers = map[string]FeatureGateHandler{
                        Chaos:     chaos.NewChaos(mgr.GetClient()),
                        Job:       job.NewJob(mgr.GetClient()),
                        ConfigMap: configmap.NewConfigMap(mgr.GetClient()),
+                       Events:    
mgr.GetEventRecorderFor("shardingsphere-chaos-controller"),
                }).SetupWithManager(mgr); err != nil {
                        logger.Error(err, "unable to create controller", 
"controller", "ShardingSphereChaos")
                        return err
diff --git 
a/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller.go 
b/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller.go
index e44d97d..95dfa97 100644
--- a/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller.go
+++ b/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller.go
@@ -19,8 +19,11 @@ package controllers
 
 import (
        "context"
+       "fmt"
        "time"
 
+       "k8s.io/client-go/tools/record"
+
        
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/configmap"
        v1 "k8s.io/api/core/v1"
 
@@ -40,7 +43,7 @@ import (
 
 const (
        ShardingSphereChaosControllerName = "shardingsphere-chaos-controller"
-       ssChaosDefaultEnqueueTime         = 5 * time.Second
+       ssChaosDefaultEnqueueTime         = 10 * time.Second
 )
 
 // ShardingSphereChaosReconciler is a controller for the ShardingSphereChaos
@@ -51,44 +54,77 @@ type ShardingSphereChaosReconciler struct { //
        Chaos     chaos.Chaos
        Job       job.Job
        ConfigMap configmap.ConfigMap
+       Events    record.EventRecorder
 }
 
 // Reconcile handles main function of this controller
 func (r *ShardingSphereChaosReconciler) Reconcile(ctx context.Context, req 
ctrl.Request) (ctrl.Result, error) {
        logger := r.Log.WithValues(ShardingSphereChaosControllerName, 
req.NamespacedName)
 
-       var ssChaos sschaosv1alpha1.ShardingSphereChaos
-       if err := r.Get(ctx, req.NamespacedName, &ssChaos); err != nil {
-               logger.Error(err, "unable to fetch ShardingSphereChaos source")
-               return ctrl.Result{}, client.IgnoreNotFound(err)
+       ssChaos, err := r.getRuntimeSSChaos(ctx, req.NamespacedName)
+       if err != nil {
+               return ctrl.Result{}, err
        }
 
        if !ssChaos.ObjectMeta.DeletionTimestamp.IsZero() {
                return ctrl.Result{}, nil
        }
-
        logger.Info("start reconcile chaos")
-       if err := r.reconcileChaos(ctx, &ssChaos); err != nil {
+       if err := r.reconcileChaos(ctx, ssChaos); err != nil {
+               if err == reconcile.ErrChangedSpec {
+                       errHandle := r.handleChaosChange(ctx, 
req.NamespacedName)
+                       return ctrl.Result{}, errHandle
+               }
                logger.Error(err, " unable to reconcile chaos")
+               r.Events.Event(ssChaos, "Warning", "chaos err", err.Error())
                return ctrl.Result{}, err
        }
-       if err := r.reconcileConfigMap(ctx, &ssChaos); err != nil {
+       if err := r.reconcileConfigMap(ctx, ssChaos); err != nil {
                logger.Error(err, "unable to reconcile configmap")
+               r.Events.Event(ssChaos, "Warning", "configmap err", err.Error())
                return ctrl.Result{}, err
        }
-       if err := r.reconcileJob(ctx, &ssChaos); err != nil {
+       if err := r.reconcileJob(ctx, ssChaos); err != nil {
                logger.Error(err, "unable to reconcile job")
+               r.Events.Event(ssChaos, "Warning", "job err", err.Error())
                return ctrl.Result{}, err
        }
-       if err := r.reconcileStatus(ctx, &ssChaos); err != nil {
+       if err := r.reconcileStatus(ctx, req.NamespacedName); err != nil {
+               r.Events.Event(ssChaos, "Warning", "update status error", 
err.Error())
                logger.Error(err, "failed to update status")
        }
 
        return ctrl.Result{RequeueAfter: ssChaosDefaultEnqueueTime}, nil
 }
 
+func (r *ShardingSphereChaosReconciler) handleChaosChange(ctx context.Context, 
name types.NamespacedName) error {
+
+       ssChaos, err := r.getRuntimeSSChaos(ctx, name)
+       if err != nil {
+               return err
+       }
+       if ssChaos.Status.Phase != sschaosv1alpha1.PhaseBeforeExperiment {
+               ssChaos.Status.Phase = sschaosv1alpha1.PhaseAfterExperiment
+               if err := r.Status().Update(ctx, ssChaos); err != nil {
+                       return err
+               }
+       }
+       return nil
+}
+
+func (r *ShardingSphereChaosReconciler) getRuntimeSSChaos(ctx context.Context, 
name types.NamespacedName) (*sschaosv1alpha1.ShardingSphereChaos, error) {
+       var rt = &sschaosv1alpha1.ShardingSphereChaos{}
+       err := r.Get(ctx, name, rt)
+       return rt, client.IgnoreNotFound(err)
+}
+
 func (r *ShardingSphereChaosReconciler) reconcileChaos(ctx context.Context, 
ssChao *sschaosv1alpha1.ShardingSphereChaos) error {
        logger := r.Log.WithValues("reconcile chaos", ssChao.Name)
+       if ssChao.Status.Phase == sschaosv1alpha1.PhaseBeforeExperiment || 
ssChao.Status.Phase == "" {
+               fmt.Println("reach here")
+               return nil
+       }
+       fmt.Println("reach here  after")
        namespaceName := types.NamespacedName{Namespace: ssChao.Namespace, 
Name: ssChao.Name}
        if ssChao.Spec.EmbedChaos.PodChaos != nil {
                chao, isExist, err := r.getPodChaosByNamespacedName(ctx, 
namespaceName)
@@ -133,49 +169,92 @@ func (r *ShardingSphereChaosReconciler) 
reconcileConfigMap(ctx context.Context,
 func (r *ShardingSphereChaosReconciler) reconcileJob(ctx context.Context, 
ssChaos *sschaosv1alpha1.ShardingSphereChaos) error {
        logger := r.Log.WithValues("reconcile job", ssChaos.Name)
        namespaceName := types.NamespacedName{Namespace: ssChaos.Namespace, 
Name: ssChaos.Name}
-
        rJob, isExist, err := r.getJobByNamespacedName(ctx, namespaceName)
        if err != nil {
                logger.Error(err, "get job err")
                return err
        }
-       //todo:update InjectRequirement by chaos status
+       var nowInjectRequirement reconcile.InjectRequirement
+       if ssChaos.Status.Phase == "" || ssChaos.Status.Phase == 
sschaosv1alpha1.PhaseBeforeExperiment || ssChaos.Status.Phase == 
sschaosv1alpha1.PhaseAfterExperiment {
+               nowInjectRequirement = reconcile.Experimental
+       }
+       if ssChaos.Status.Phase == sschaosv1alpha1.PhaseInChaos || 
ssChaos.Status.Phase == sschaosv1alpha1.PhaseRecoveredChaos {
+               nowInjectRequirement = reconcile.Pressure
+       }
        if isExist {
-               return r.updateJob(ctx, reconcile.Experimental, ssChaos, rJob)
+               return r.updateJob(ctx, nowInjectRequirement, ssChaos, rJob)
        }
 
-       return r.createJob(ctx, reconcile.Experimental, ssChaos)
+       return r.createJob(ctx, nowInjectRequirement, ssChaos)
 }
 
-func (r *ShardingSphereChaosReconciler) reconcileStatus(ctx context.Context, 
ssChaos *sschaosv1alpha1.ShardingSphereChaos) error {
-       var (
-               chaoCondition  sschaosv1alpha1.ChaosCondition
-               namespacedName = types.NamespacedName{
-                       Namespace: ssChaos.Namespace,
-                       Name:      ssChaos.Name,
+func (r *ShardingSphereChaosReconciler) reconcileStatus(ctx context.Context, 
namespacedName types.NamespacedName) error {
+       ssChaos, err := r.getRuntimeSSChaos(ctx, namespacedName)
+       if err != nil {
+               return err
+       }
+       if ssChaos.Status.Phase == "" {
+               ssChaos.Status.Phase = sschaosv1alpha1.PhaseBeforeExperiment
+       }
+       rJob := &batchV1.Job{}
+       if err := r.Get(ctx, namespacedName, rJob); err != nil {
+               return err
+       }
+
+       if ssChaos.Status.Phase == sschaosv1alpha1.PhaseBeforeExperiment && 
rJob.Status.Succeeded == 1 {
+               ssChaos.Status.Phase = sschaosv1alpha1.PhaseAfterExperiment
+       }
+
+       if err := r.updatePhaseStart(ctx, ssChaos); err != nil {
+               return err
+       }
+
+       rt, err := r.getRuntimeSSChaos(ctx, namespacedName)
+       if err != nil {
+               return err
+       }
+       rt.Status = ssChaos.Status
+       return r.Status().Update(ctx, rt)
+}
+
+func (r *ShardingSphereChaosReconciler) updatePhaseStart(ctx context.Context, 
ssChaos *sschaosv1alpha1.ShardingSphereChaos) error {
+       if ssChaos.Status.Phase != sschaosv1alpha1.PhaseBeforeExperiment {
+               if err := r.updateChaosCondition(ctx, ssChaos); err != nil {
+                       return err
                }
-       )
+
+               if ssChaos.Status.ChaosCondition == sschaosv1alpha1.AllInjected 
&& ssChaos.Status.Phase == sschaosv1alpha1.PhaseAfterExperiment {
+                       ssChaos.Status.Phase = sschaosv1alpha1.PhaseInChaos
+               }
+
+               if ssChaos.Status.ChaosCondition == 
sschaosv1alpha1.AllRecovered && ssChaos.Status.Phase == 
sschaosv1alpha1.PhaseInChaos {
+                       ssChaos.Status.Phase = 
sschaosv1alpha1.PhaseRecoveredChaos
+               }
+       }
+
+       return nil
+}
+
+func (r *ShardingSphereChaosReconciler) updateChaosCondition(ctx 
context.Context, ssChaos *sschaosv1alpha1.ShardingSphereChaos) error {
+       namespacedName := types.NamespacedName{
+               Namespace: ssChaos.Namespace,
+               Name:      ssChaos.Name,
+       }
        if ssChaos.Spec.EmbedChaos.PodChaos != nil {
                chao, err := r.Chaos.GetPodChaosByNamespacedName(ctx, 
namespacedName)
                if err != nil {
                        return err
                }
-               chaoCondition = r.Chaos.ConvertChaosStatus(ctx, ssChaos, chao)
+               ssChaos.Status.ChaosCondition = r.Chaos.ConvertChaosStatus(ctx, 
ssChaos, chao)
        } else if ssChaos.Spec.EmbedChaos.NetworkChaos != nil {
                chao, err := r.Chaos.GetNetworkChaosByNamespacedName(ctx, 
namespacedName)
                if err != nil {
                        return err
                }
-               chaoCondition = r.Chaos.ConvertChaosStatus(ctx, ssChaos, chao)
+               ssChaos.Status.ChaosCondition = r.Chaos.ConvertChaosStatus(ctx, 
ssChaos, chao)
        }
 
-       var rt sschaosv1alpha1.ShardingSphereChaos
-       if err := r.Get(ctx, namespacedName, &rt); err != nil {
-               return err
-       }
-       ssChaos.Status.ChaosCondition = chaoCondition
-       rt.Status = ssChaos.Status
-       return r.Status().Update(ctx, &rt)
+       return nil
 }
 
 func (r *ShardingSphereChaosReconciler) getNetworkChaosByNamespacedName(ctx 
context.Context, namespacedName types.NamespacedName) (reconcile.NetworkChaos, 
bool, error) {
@@ -277,11 +356,20 @@ func (r *ShardingSphereChaosReconciler) createJob(ctx 
context.Context, requireme
        if err == nil && apierrors.IsAlreadyExists(err) {
                return nil
        }
+
        return err
 }
 
 func (r *ShardingSphereChaosReconciler) updatePodChaos(ctx context.Context, 
chao *sschaosv1alpha1.ShardingSphereChaos, podChaos reconcile.PodChaos) error {
-       return r.Chaos.UpdatePodChaos(ctx, chao, podChaos)
+       err := r.Chaos.UpdatePodChaos(ctx, chao, podChaos)
+       if err != nil {
+               if err == reconcile.ErrNotChanged {
+                       return nil
+               }
+               return err
+       }
+       r.Events.Event(chao, "Normal", "applied", fmt.Sprintf("podChaos %s", 
"new changes updated"))
+       return reconcile.ErrChangedSpec
 }
 
 func (r *ShardingSphereChaosReconciler) CreatePodChaos(ctx context.Context, 
chao *sschaosv1alpha1.ShardingSphereChaos) error {
@@ -289,11 +377,24 @@ func (r *ShardingSphereChaosReconciler) 
CreatePodChaos(ctx context.Context, chao
        if err != nil {
                return err
        }
-       return r.Chaos.CreatePodChaos(ctx, podChaos)
+       err = r.Chaos.CreatePodChaos(ctx, podChaos)
+       if err != nil {
+               return err
+       }
+       r.Events.Event(chao, "Normal", "created", fmt.Sprintf("podChaos %s", " 
is created successfully"))
+       return nil
 }
 
 func (r *ShardingSphereChaosReconciler) updateNetWorkChaos(ctx 
context.Context, chao *sschaosv1alpha1.ShardingSphereChaos, netWorkChaos 
reconcile.NetworkChaos) error {
-       return r.Chaos.UpdateNetworkChaos(ctx, chao, netWorkChaos)
+       err := r.Chaos.UpdateNetworkChaos(ctx, chao, netWorkChaos)
+       if err != nil {
+               if err == reconcile.ErrNotChanged {
+                       return nil
+               }
+               return err
+       }
+       r.Events.Event(chao, "Normal", "applied", fmt.Sprintf("networkChaos 
%s", "new changes updated"))
+       return reconcile.ErrChangedSpec
 }
 
 func (r *ShardingSphereChaosReconciler) CreateNetworkChaos(ctx 
context.Context, chao *sschaosv1alpha1.ShardingSphereChaos) error {
@@ -301,7 +402,13 @@ func (r *ShardingSphereChaosReconciler) 
CreateNetworkChaos(ctx context.Context,
        if err != nil {
                return err
        }
-       return r.Chaos.CreateNetworkChaos(ctx, networkChaos)
+       err = r.Chaos.CreateNetworkChaos(ctx, networkChaos)
+       if err != nil {
+               return err
+       }
+
+       r.Events.Event(chao, "Normal", "created", fmt.Sprintf("networkChaos 
%s", "  is created successfully"))
+       return nil
 }
 
 // SetupWithManager sets up the controller with the Manager.
diff --git 
a/shardingsphere-operator/pkg/reconcile/shardingspherechaos/chaos-mesh.go 
b/shardingsphere-operator/pkg/reconcile/shardingspherechaos/chaos-mesh.go
index 459a744..21bc8f6 100644
--- a/shardingsphere-operator/pkg/reconcile/shardingspherechaos/chaos-mesh.go
+++ b/shardingsphere-operator/pkg/reconcile/shardingspherechaos/chaos-mesh.go
@@ -50,7 +50,9 @@ const (
 )
 
 var (
-       ErrConvert = errors.New("can not convert chaos interface to specify 
struct")
+       ErrConvert     = errors.New("can not convert chaos interface to specify 
struct")
+       ErrNotChanged  = errors.New("object not changed")
+       ErrChangedSpec = errors.New("change spec")
 )
 
 type chaosMeshHandler struct {
@@ -67,13 +69,13 @@ func (c *chaosMeshHandler) ConvertChaosStatus(ctx 
context.Context, ssChaos *v1al
                if podChao, ok := chaos.(*chaosv1alpha1.PodChaos); ok && 
podChao != nil {
                        status = *podChao.GetStatus()
                } else {
-                       return v1alpha1.UnKnown
+                       return v1alpha1.Unknown
                }
        } else if ssChaos.Spec.EmbedChaos.NetworkChaos != nil {
                if networkChaos, ok := chaos.(*chaosv1alpha1.NetworkChaos); ok 
&& networkChaos != nil {
                        status = *networkChaos.GetStatus()
                }
-               return v1alpha1.UnKnown
+               return v1alpha1.Unknown
        }
        var conditions = map[chaosv1alpha1.ChaosConditionType]bool{}
        for i := range status.Conditions {
@@ -102,7 +104,7 @@ func judgeCondition(condition 
map[chaosv1alpha1.ChaosConditionType]bool, phase c
                }
        }
 
-       return v1alpha1.UnKnown
+       return v1alpha1.Unknown
 }
 
 func (c *chaosMeshHandler) CreatePodChaos(ctx context.Context, chao PodChaos) 
error {
@@ -302,7 +304,7 @@ func (c *chaosMeshHandler) UpdateNetworkChaos(ctx 
context.Context, ssChaos *v1al
        }
        isEqual := reflect.DeepEqual(reExp.Spec, reCur.Spec)
        if isEqual {
-               return nil
+               return ErrNotChanged
        }
 
        if err := c.r.Create(ctx, reCur); err != nil {
@@ -331,7 +333,7 @@ func (c *chaosMeshHandler) UpdatePodChaos(ctx 
context.Context, ssChaos *v1alpha1
        }
        isEqual := reflect.DeepEqual(reExp.Spec, reCur.Spec)
        if isEqual {
-               return nil
+               return ErrNotChanged
        }
 
        if err := c.r.Delete(ctx, reCur); err != nil {
diff --git a/shardingsphere-operator/pkg/reconcile/shardingspherechaos/job.go 
b/shardingsphere-operator/pkg/reconcile/shardingspherechaos/job.go
index 06a3bfc..9f0ac74 100644
--- a/shardingsphere-operator/pkg/reconcile/shardingspherechaos/job.go
+++ b/shardingsphere-operator/pkg/reconcile/shardingspherechaos/job.go
@@ -121,7 +121,7 @@ func NewJob(ssChaos *v1alpha1.ShardingSphereChaos, 
requirement InjectRequirement
        cbd.SetImage(DefaultImageName)
        cbd.SetName(DefaultContainerName)
        cbd.SetVolumeMount(vm)
-       cbd.SetCommand([]string{"sh", "-c"})
+       cbd.SetCommand([]string{"sh"})
        container := cbd.Build()
        container.Args = NewCmds(requirement)
        jbd.SetContainers(container)
@@ -129,15 +129,16 @@ func NewJob(ssChaos *v1alpha1.ShardingSphereChaos, 
requirement InjectRequirement
        return rjob, nil
 }
 
-func NewCmds(requirement InjectRequirement) (cmds []string) {
-
+func NewCmds(requirement InjectRequirement) []string {
+       var cmds []string
+       cmds = append(cmds, "-c")
        if requirement == Experimental {
                cmds = append(cmds, fmt.Sprintf("%s/%s", DefaultWorkPath, 
configExperimental))
        }
        if requirement == Pressure {
                cmds = append(cmds, fmt.Sprintf("%s/%s", DefaultWorkPath, 
configExperimental), fmt.Sprintf("%s/%s", DefaultWorkPath, configPressure))
        }
-       return
+       return cmds
 }
 
 func MustInt32(s string) (int32, error) {

Reply via email to