This is an automated email from the ASF dual-hosted git repository.

miaoliyao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/shardingsphere-on-cloud.git


The following commit(s) were added to refs/heads/main by this push:
     new 0632954  add reconcile logic and verfity Spec
     new ef582e2  Merge pull request #316 from moomman/main
0632954 is described below

commit 063295453bcc3f3c629f5e02c598bdcfba5a87ab
Author: moonman <[email protected]>
AuthorDate: Fri Apr 21 15:25:54 2023 +0800

    add reconcile logic and verfity Spec
---
 .../api/v1alpha1/shardingsphere_chaos_types.go     |  20 ++
 .../api/v1alpha1/zz_generated.deepcopy.go          |  57 ++-
 shardingsphere-operator/build/tools/Dockerfile     |   2 +
 .../cmd/shardingsphere-operator/manager/manager.go |   1 -
 .../cmd/shardingsphere-operator/manager/option.go  |   7 +
 .../controllers/shardingsphere_chaos_controller.go | 391 +++++++++++++++++----
 .../pkg/reconcile/shardingspherechaos/configmap.go |   9 +-
 .../pkg/reconcile/shardingspherechaos/job.go       |  47 ++-
 .../shardingspherechaos_suite_test.go              |   7 +
 9 files changed, 460 insertions(+), 81 deletions(-)

diff --git a/shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go 
b/shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go
index 32c2126..50ac996 100644
--- a/shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go
+++ b/shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go
@@ -44,6 +44,11 @@ type ShardingSphereChaos struct {
 type ShardingSphereChaosSpec struct {
        InjectJob  JobSpec `json:"injectJob,omitempty"`
        EmbedChaos `json:",inline"`
+       Expect     Expect `json:"expect,omitempty"`
+}
+
+type Expect struct {
+       Verify string `json:"verify,omitempty"`
 }
 
 // JobSpec Specifies the config of job to create
@@ -54,6 +59,8 @@ type JobSpec struct {
        Pressure string `json:"pressure,omitempty"`
        // +optional
        Position string `json:"position,omitempty"`
+       // +optional
+       Verify string `json:"verify,omitempty"`
 }
 
 type EmbedChaos struct {
@@ -78,6 +85,18 @@ const (
 type ShardingSphereChaosStatus struct {
        ChaosCondition ChaosCondition `json:"chaosCondition"`
        Phase          Phase          `json:"phase"`
+       Result         []Result       `json:"result"`
+}
+
+// Result represents the result of the ShardingSphereChaos
+type Result struct {
+       Success bool   `json:"success"`
+       Detail  Detail `json:"details"`
+}
+
+type Detail struct {
+       Time metav1.Time `json:"time"`
+       Msg  string      `json:"message"`
 }
 
 type Phase string
@@ -85,6 +104,7 @@ type Phase string
 var (
        PhaseBeforeExperiment Phase = "BeforeReq"
        PhaseAfterExperiment  Phase = "AfterReq"
+       PhaseCreatingChaos    Phase = "Creating"
        PhaseInChaos          Phase = "Injected"
        PhaseRecoveredChaos   Phase = "Recovered"
 )
diff --git a/shardingsphere-operator/api/v1alpha1/zz_generated.deepcopy.go 
b/shardingsphere-operator/api/v1alpha1/zz_generated.deepcopy.go
index 9f4063b..5f9622f 100644
--- a/shardingsphere-operator/api/v1alpha1/zz_generated.deepcopy.go
+++ b/shardingsphere-operator/api/v1alpha1/zz_generated.deepcopy.go
@@ -491,6 +491,22 @@ func (in *DelayActionParams) DeepCopy() *DelayActionParams 
{
        return out
 }
 
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, 
writing into out. in must be non-nil.
+func (in *Detail) DeepCopyInto(out *Detail) {
+       *out = *in
+       in.Time.DeepCopyInto(&out.Time)
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, 
creating a new Detail.
+func (in *Detail) DeepCopy() *Detail {
+       if in == nil {
+               return nil
+       }
+       out := new(Detail)
+       in.DeepCopyInto(out)
+       return out
+}
+
 // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, 
writing into out. in must be non-nil.
 func (in *DuplicateActionParams) DeepCopyInto(out *DuplicateActionParams) {
        *out = *in
@@ -531,6 +547,21 @@ func (in *EmbedChaos) DeepCopy() *EmbedChaos {
        return out
 }
 
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, 
writing into out. in must be non-nil.
+func (in *Expect) DeepCopyInto(out *Expect) {
+       *out = *in
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, 
creating a new Expect.
+func (in *Expect) DeepCopy() *Expect {
+       if in == nil {
+               return nil
+       }
+       out := new(Expect)
+       in.DeepCopyInto(out)
+       return out
+}
+
 // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, 
writing into out. in must be non-nil.
 func (in *JobSpec) DeepCopyInto(out *JobSpec) {
        *out = *in
@@ -1158,6 +1189,22 @@ func (in *RepositoryConfig) DeepCopy() *RepositoryConfig 
{
        return out
 }
 
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, 
writing into out. in must be non-nil.
+func (in *Result) DeepCopyInto(out *Result) {
+       *out = *in
+       in.Detail.DeepCopyInto(&out.Detail)
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, 
creating a new Result.
+func (in *Result) DeepCopy() *Result {
+       if in == nil {
+               return nil
+       }
+       out := new(Result)
+       in.DeepCopyInto(out)
+       return out
+}
+
 // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, 
writing into out. in must be non-nil.
 func (in *ServerConfig) DeepCopyInto(out *ServerConfig) {
        *out = *in
@@ -1225,7 +1272,7 @@ func (in *ShardingSphereChaos) DeepCopyInto(out 
*ShardingSphereChaos) {
        out.TypeMeta = in.TypeMeta
        in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
        in.Spec.DeepCopyInto(&out.Spec)
-       out.Status = in.Status
+       in.Status.DeepCopyInto(&out.Status)
 }
 
 // DeepCopy is an autogenerated deepcopy function, copying the receiver, 
creating a new ShardingSphereChaos.
@@ -1283,6 +1330,7 @@ func (in *ShardingSphereChaosSpec) DeepCopyInto(out 
*ShardingSphereChaosSpec) {
        *out = *in
        out.InjectJob = in.InjectJob
        in.EmbedChaos.DeepCopyInto(&out.EmbedChaos)
+       out.Expect = in.Expect
 }
 
 // DeepCopy is an autogenerated deepcopy function, copying the receiver, 
creating a new ShardingSphereChaosSpec.
@@ -1298,6 +1346,13 @@ func (in *ShardingSphereChaosSpec) DeepCopy() 
*ShardingSphereChaosSpec {
 // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, 
writing into out. in must be non-nil.
 func (in *ShardingSphereChaosStatus) DeepCopyInto(out 
*ShardingSphereChaosStatus) {
        *out = *in
+       if in.Result != nil {
+               in, out := &in.Result, &out.Result
+               *out = make([]Result, len(*in))
+               for i := range *in {
+                       (*in)[i].DeepCopyInto(&(*out)[i])
+               }
+       }
 }
 
 // DeepCopy is an autogenerated deepcopy function, copying the receiver, 
creating a new ShardingSphereChaosStatus.
diff --git a/shardingsphere-operator/build/tools/Dockerfile 
b/shardingsphere-operator/build/tools/Dockerfile
index d73511b..fc3ad0f 100644
--- a/shardingsphere-operator/build/tools/Dockerfile
+++ b/shardingsphere-operator/build/tools/Dockerfile
@@ -33,6 +33,8 @@ RUN set -eux; \
         apt-get install -y --no-install-recommends libncursesw6 libreadline8 
libssh-4; \
         apt-get install -y --no-install-recommends openjdk-11-jre-headless; \
         echo "export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-11-amd64" >> 
/etc/profile; \
+        echo "alias mysql=\"mysqlsh\"" >> /etc/profile; \
+        . /etc/profile; \
         \
         rm -rf /var/lib/apt/lists/*; \
         \
diff --git 
a/shardingsphere-operator/cmd/shardingsphere-operator/manager/manager.go 
b/shardingsphere-operator/cmd/shardingsphere-operator/manager/manager.go
index 06166a9..05681f0 100644
--- a/shardingsphere-operator/cmd/shardingsphere-operator/manager/manager.go
+++ b/shardingsphere-operator/cmd/shardingsphere-operator/manager/manager.go
@@ -49,7 +49,6 @@ func SetupWithOptions(opts *Options) *Manager {
                logger.Error(err, "unable to start manager")
                os.Exit(1)
        }
-
        if err = (&controllers.ProxyReconciler{
                Client: mgr.GetClient(),
                Scheme: mgr.GetScheme(),
diff --git 
a/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go 
b/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go
index 10ff9f5..e1e1fe5 100644
--- a/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go
+++ b/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go
@@ -21,6 +21,8 @@ import (
        "flag"
        "strings"
 
+       clientset "k8s.io/client-go/kubernetes"
+
        
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/job"
 
        
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/chaos"
@@ -139,6 +141,10 @@ var featureGatesHandlers = map[string]FeatureGateHandler{
                return nil
        },
        "ShardingSphereChaos": func(mgr manager.Manager) error {
+               clientset, err := clientset.NewForConfig(mgr.GetConfig())
+               if err != nil {
+                       return err
+               }
                if err := (&controllers.ShardingSphereChaosReconciler{
                        Client:    mgr.GetClient(),
                        Scheme:    mgr.GetScheme(),
@@ -147,6 +153,7 @@ var featureGatesHandlers = map[string]FeatureGateHandler{
                        Job:       job.NewJob(mgr.GetClient()),
                        ConfigMap: configmap.NewConfigMap(mgr.GetClient()),
                        Events:    
mgr.GetEventRecorderFor("shardingsphere-chaos-controller"),
+                       ClientSet: clientset,
                }).SetupWithManager(mgr); err != nil {
                        logger.Error(err, "unable to create controller", 
"controller", "ShardingSphereChaos")
                        return err
diff --git 
a/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller.go 
b/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller.go
index 95dfa97..b84b0be 100644
--- a/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller.go
+++ b/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller.go
@@ -19,9 +19,18 @@ package controllers
 
 import (
        "context"
+       "errors"
        "fmt"
+       "strings"
        "time"
 
+       "k8s.io/apimachinery/pkg/util/wait"
+       "k8s.io/client-go/util/retry"
+
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
+       clientset "k8s.io/client-go/kubernetes"
+
        "k8s.io/client-go/tools/record"
 
        
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/configmap"
@@ -44,6 +53,20 @@ import (
 const (
        ShardingSphereChaosControllerName = "shardingsphere-chaos-controller"
        ssChaosDefaultEnqueueTime         = 10 * time.Second
+       VerifyJobCheck                    = "Verify"
+)
+
+var (
+       ErrNoPod = errors.New("no pod in list")
+)
+
+type JobCondition string
+
+var (
+       CompleteJob JobCondition = "complete"
+       FailureJob  JobCondition = "failure"
+       SuspendJob  JobCondition = "suspend"
+       ActiveJob   JobCondition = "active"
 )
 
 // ShardingSphereChaosReconciler is a controller for the ShardingSphereChaos
@@ -55,6 +78,7 @@ type ShardingSphereChaosReconciler struct { //
        Job       job.Job
        ConfigMap configmap.ConfigMap
        Events    record.EventRecorder
+       ClientSet *clientset.Clientset
 }
 
 // Reconcile handles main function of this controller
@@ -120,72 +144,92 @@ func (r *ShardingSphereChaosReconciler) 
getRuntimeSSChaos(ctx context.Context, n
 
 func (r *ShardingSphereChaosReconciler) reconcileChaos(ctx context.Context, 
ssChao *sschaosv1alpha1.ShardingSphereChaos) error {
        logger := r.Log.WithValues("reconcile chaos", ssChao.Name)
+
        if ssChao.Status.Phase == sschaosv1alpha1.PhaseBeforeExperiment || 
ssChao.Status.Phase == "" {
-               fmt.Println("reach here")
                return nil
        }
-       fmt.Println("reach here  after")
        namespaceName := types.NamespacedName{Namespace: ssChao.Namespace, 
Name: ssChao.Name}
+
        if ssChao.Spec.EmbedChaos.PodChaos != nil {
-               chao, isExist, err := r.getPodChaosByNamespacedName(ctx, 
namespaceName)
+               chao, err := r.getPodChaosByNamespacedName(ctx, namespaceName)
                if err != nil {
                        logger.Error(err, "pod chaos err")
                        return err
                }
-               if isExist {
+               if chao != nil {
                        return r.updatePodChaos(ctx, ssChao, chao)
                }
                return r.CreatePodChaos(ctx, ssChao)
        } else if ssChao.Spec.EmbedChaos.NetworkChaos != nil {
-               chao, isExist, err := r.getNetworkChaosByNamespacedName(ctx, 
namespaceName)
+               chao, err := r.getNetworkChaosByNamespacedName(ctx, 
namespaceName)
                if err != nil {
                        logger.Error(err, "network chao err")
                        return err
                }
-               if isExist {
+               if chao != nil {
                        return r.updateNetWorkChaos(ctx, ssChao, chao)
                }
                return r.CreateNetworkChaos(ctx, ssChao)
        }
+
        return nil
 }
 
 func (r *ShardingSphereChaosReconciler) reconcileConfigMap(ctx 
context.Context, ssChaos *sschaosv1alpha1.ShardingSphereChaos) error {
        logger := r.Log.WithValues("reconcile configmap", ssChaos.Name)
        namespaceName := types.NamespacedName{Namespace: ssChaos.Namespace, 
Name: ssChaos.Name}
-       rConfigmap, isExist, err := r.getConfigMapByNamespacedName(ctx, 
namespaceName)
+       rConfigmap, err := r.getConfigMapByNamespacedName(ctx, namespaceName)
        if err != nil {
                logger.Error(err, "get configmap error")
                return err
        }
 
-       if isExist {
+       if rConfigmap != nil {
                return r.updateConfigMap(ctx, ssChaos, rConfigmap)
        }
 
-       return r.CreateConfigMap(ctx, ssChaos)
+       err = r.CreateConfigMap(ctx, ssChaos)
+       if err != nil {
+               r.Events.Event(ssChaos, "Warning", "Created", 
fmt.Sprintf("configmap created fail %s", err))
+               return err
+       }
+
+       r.Events.Event(ssChaos, "Normal", "Created", "configmap created 
successfully")
+       return nil
 }
 
 func (r *ShardingSphereChaosReconciler) reconcileJob(ctx context.Context, 
ssChaos *sschaosv1alpha1.ShardingSphereChaos) error {
        logger := r.Log.WithValues("reconcile job", ssChaos.Name)
-       namespaceName := types.NamespacedName{Namespace: ssChaos.Namespace, 
Name: ssChaos.Name}
-       rJob, isExist, err := r.getJobByNamespacedName(ctx, namespaceName)
-       if err != nil {
-               logger.Error(err, "get job err")
-               return err
-       }
+
        var nowInjectRequirement reconcile.InjectRequirement
-       if ssChaos.Status.Phase == "" || ssChaos.Status.Phase == 
sschaosv1alpha1.PhaseBeforeExperiment || ssChaos.Status.Phase == 
sschaosv1alpha1.PhaseAfterExperiment {
+       switch {
+       case ssChaos.Status.Phase == "" || ssChaos.Status.Phase == 
sschaosv1alpha1.PhaseBeforeExperiment || ssChaos.Status.Phase == 
sschaosv1alpha1.PhaseAfterExperiment:
                nowInjectRequirement = reconcile.Experimental
-       }
-       if ssChaos.Status.Phase == sschaosv1alpha1.PhaseInChaos || 
ssChaos.Status.Phase == sschaosv1alpha1.PhaseRecoveredChaos {
+       case ssChaos.Status.Phase == sschaosv1alpha1.PhaseInChaos:
                nowInjectRequirement = reconcile.Pressure
+       case ssChaos.Status.Phase == sschaosv1alpha1.PhaseRecoveredChaos:
+               nowInjectRequirement = reconcile.Verify
+       }
+
+       namespaceName := types.NamespacedName{Namespace: ssChaos.Namespace, 
Name: reconcile.SetJobNamespaceName(ssChaos.Name, nowInjectRequirement)}
+
+       rJob, err := r.getJobByNamespacedName(ctx, namespaceName)
+       if err != nil {
+               logger.Error(err, "get job err")
+               return err
        }
-       if isExist {
+
+       if rJob != nil {
                return r.updateJob(ctx, nowInjectRequirement, ssChaos, rJob)
        }
 
-       return r.createJob(ctx, nowInjectRequirement, ssChaos)
+       err = r.createJob(ctx, nowInjectRequirement, ssChaos)
+       if err != nil {
+               return err
+       }
+
+       r.Events.Event(ssChaos, "Normal", "Created", fmt.Sprintf("%s job 
created successfully", string(nowInjectRequirement)))
+       return nil
 }
 
 func (r *ShardingSphereChaosReconciler) reconcileStatus(ctx context.Context, 
namespacedName types.NamespacedName) error {
@@ -193,17 +237,30 @@ func (r *ShardingSphereChaosReconciler) 
reconcileStatus(ctx context.Context, nam
        if err != nil {
                return err
        }
-       if ssChaos.Status.Phase == "" {
-               ssChaos.Status.Phase = sschaosv1alpha1.PhaseBeforeExperiment
-       }
-       rJob := &batchV1.Job{}
-       if err := r.Get(ctx, namespacedName, rJob); err != nil {
+
+       setDefault(ssChaos)
+
+       jobName := getRequirement(ssChaos)
+       rJob, err := r.getJobByNamespacedName(ctx, 
types.NamespacedName{Namespace: ssChaos.Namespace, Name: 
reconcile.SetJobNamespaceName(ssChaos.Name, jobName)})
+       if err != nil || rJob == nil {
                return err
        }
 
        if ssChaos.Status.Phase == sschaosv1alpha1.PhaseBeforeExperiment && 
rJob.Status.Succeeded == 1 {
                ssChaos.Status.Phase = sschaosv1alpha1.PhaseAfterExperiment
        }
+       jobConditions := rJob.Status.Conditions
+       condition := getJobCondition(jobConditions)
+
+       if condition == FailureJob {
+               r.Events.Event(ssChaos, "Warning", "failed", fmt.Sprintf("job: 
%s", rJob.Name))
+       }
+       if ssChaos.Status.Phase == sschaosv1alpha1.PhaseRecoveredChaos {
+               if err := r.updateRecoveredJob(ctx, ssChaos, rJob); err != nil {
+                       r.Events.Event(ssChaos, "Warning", "getPodLog", 
err.Error())
+                       return err
+               }
+       }
 
        if err := r.updatePhaseStart(ctx, ssChaos); err != nil {
                return err
@@ -213,10 +270,180 @@ func (r *ShardingSphereChaosReconciler) 
reconcileStatus(ctx context.Context, nam
        if err != nil {
                return err
        }
-       rt.Status = ssChaos.Status
+       setRtStatus(rt, ssChaos)
        return r.Status().Update(ctx, rt)
 }
 
+func getRequirement(ssChaos *sschaosv1alpha1.ShardingSphereChaos) 
reconcile.InjectRequirement {
+       var jobName reconcile.InjectRequirement
+       if ssChaos.Status.Phase == sschaosv1alpha1.PhaseBeforeExperiment || 
ssChaos.Status.Phase == sschaosv1alpha1.PhaseAfterExperiment {
+               jobName = reconcile.Experimental
+       }
+       if ssChaos.Status.Phase == sschaosv1alpha1.PhaseInChaos {
+               jobName = reconcile.Pressure
+       }
+       if ssChaos.Status.Phase == sschaosv1alpha1.PhaseRecoveredChaos {
+               jobName = reconcile.Verify
+       }
+       return jobName
+}
+
+func getJobCondition(conditions []batchV1.JobCondition) JobCondition {
+       var ret = ActiveJob
+       for i := range conditions {
+               p := &conditions[i]
+               switch {
+               case p.Type == batchV1.JobComplete && p.Status == 
v1.ConditionTrue:
+                       ret = CompleteJob
+               case p.Type == batchV1.JobFailed && p.Status == 
v1.ConditionTrue:
+                       ret = FailureJob
+               case p.Type == batchV1.JobSuspended && p.Status == 
v1.ConditionTrue:
+                       ret = SuspendJob
+               case p.Type == batchV1.JobFailureTarget:
+                       ret = FailureJob
+               }
+       }
+       return ret
+}
+
+func setDefault(ssChaos *sschaosv1alpha1.ShardingSphereChaos) {
+       if ssChaos.Status.Phase == "" {
+               ssChaos.Status.Phase = sschaosv1alpha1.PhaseBeforeExperiment
+       }
+       if ssChaos.Status.Result == nil {
+               ssChaos.Status.Result = []sschaosv1alpha1.Result{}
+       }
+}
+
+func setRtStatus(rt *sschaosv1alpha1.ShardingSphereChaos, ssChaos 
*sschaosv1alpha1.ShardingSphereChaos) {
+       rt.Status.Result = []sschaosv1alpha1.Result{}
+       for i := range ssChaos.Status.Result {
+               r := &ssChaos.Status.Result[i]
+               rt.Status.Result = append(rt.Status.Result, 
sschaosv1alpha1.Result{
+                       Success: r.Success,
+                       Detail: sschaosv1alpha1.Detail{
+                               Time: metav1.Time{Time: time.Now()},
+                               Msg:  r.Detail.Msg,
+                       },
+               })
+       }
+
+       rt.Status.Phase = ssChaos.Status.Phase
+       rt.Status.ChaosCondition = ssChaos.Status.ChaosCondition
+}
+
+func (r *ShardingSphereChaosReconciler) updateRecoveredJob(ctx 
context.Context, ssChaos *sschaosv1alpha1.ShardingSphereChaos, rJob 
*batchV1.Job) error {
+       if isResultExist(rJob) {
+               return nil
+       }
+
+       for i := range ssChaos.Status.Result {
+               r := &ssChaos.Status.Result[i]
+               if strings.HasPrefix(r.Detail.Msg, VerifyJobCheck) {
+                       return nil
+               }
+       }
+
+       logOpts := &v1.PodLogOptions{}
+       pod, err := r.getPodHaveLog(ctx, rJob)
+       if err != nil || pod == nil {
+               return err
+       }
+       podNamespacedName := types.NamespacedName{
+               Namespace: pod.Namespace,
+               Name:      pod.Name,
+       }
+       condition := getJobCondition(rJob.Status.Conditions)
+       result := &sschaosv1alpha1.Result{}
+
+       if condition == CompleteJob {
+               log, err := r.getPodLog(ctx, podNamespacedName, logOpts)
+               if err != nil {
+                       return err
+               }
+               if ssChaos.Spec.Expect.Verify == "" || 
ssChaos.Spec.Expect.Verify == log {
+                       result.Success = true
+                       result.Detail = sschaosv1alpha1.Detail{
+                               Time: metav1.Time{Time: time.Now()},
+                               Msg:  fmt.Sprintf("%s: job succeeded", 
VerifyJobCheck),
+                       }
+               } else {
+                       result.Success = false
+                       result.Detail = sschaosv1alpha1.Detail{
+                               Time: metav1.Time{Time: time.Now()},
+                               Msg:  fmt.Sprintf("%s: %s", VerifyJobCheck, 
log),
+                       }
+               }
+       }
+
+       if condition == FailureJob {
+               log, err := r.getPodLog(ctx, podNamespacedName, logOpts)
+               if err != nil {
+                       return err
+               }
+               result.Success = false
+               result.Detail = sschaosv1alpha1.Detail{
+                       Time: metav1.Time{Time: time.Now()},
+                       Msg:  fmt.Sprintf("%s: %s", VerifyJobCheck, log),
+               }
+       }
+
+       ssChaos.Status.Result = updateResult(ssChaos.Status.Result, *result, 
VerifyJobCheck)
+
+       return nil
+}
+
+func (r *ShardingSphereChaosReconciler) getPodHaveLog(ctx context.Context, 
rJob *batchV1.Job) (*v1.Pod, error) {
+       pods := &v1.PodList{}
+       if err := r.List(ctx, pods, client.MatchingLabels{"controller-uid": 
rJob.Spec.Template.Labels["controller-uid"]}); err != nil {
+               return nil, err
+       }
+       if pods.Items == nil {
+               return nil, nil
+       }
+       var pod *v1.Pod
+       for i := range pods.Items {
+               pod = &pods.Items[i]
+               break
+       }
+       return pod, nil
+}
+
+func isResultExist(rJob *batchV1.Job) bool {
+       for _, cmd := range rJob.Spec.Template.Spec.Containers[0].Args {
+               if strings.Contains(cmd, string(reconcile.Verify)) {
+                       return true
+               }
+       }
+       return false
+}
+
+func updateResult(results []sschaosv1alpha1.Result, r sschaosv1alpha1.Result, 
check string) []sschaosv1alpha1.Result {
+       for i := range results {
+               msg := results[i].Detail.Msg
+               if strings.HasPrefix(msg, check) && 
strings.HasPrefix(r.Detail.Msg, check) {
+                       results[i] = r
+                       return results
+               }
+       }
+       results = append(results, r)
+       return results
+}
+
+func (r *ShardingSphereChaosReconciler) getPodLog(ctx context.Context, 
namespacedName types.NamespacedName, options *v1.PodLogOptions) (string, error) 
{
+       req := 
r.ClientSet.CoreV1().Pods(namespacedName.Namespace).GetLogs(namespacedName.Name,
 options)
+       res := req.Do(ctx)
+       if res.Error() != nil {
+               return "", res.Error()
+       }
+       var ret []byte
+       ret, err := res.Raw()
+       if err != nil {
+               return "", err
+       }
+       return string(ret), nil
+}
+
 func (r *ShardingSphereChaosReconciler) updatePhaseStart(ctx context.Context, 
ssChaos *sschaosv1alpha1.ShardingSphereChaos) error {
        if ssChaos.Status.Phase != sschaosv1alpha1.PhaseBeforeExperiment {
                if err := r.updateChaosCondition(ctx, ssChaos); err != nil {
@@ -257,50 +484,37 @@ func (r *ShardingSphereChaosReconciler) 
updateChaosCondition(ctx context.Context
        return nil
 }
 
-func (r *ShardingSphereChaosReconciler) getNetworkChaosByNamespacedName(ctx 
context.Context, namespacedName types.NamespacedName) (reconcile.NetworkChaos, 
bool, error) {
+func (r *ShardingSphereChaosReconciler) getNetworkChaosByNamespacedName(ctx 
context.Context, namespacedName types.NamespacedName) (reconcile.NetworkChaos, 
error) {
        nc, err := r.Chaos.GetNetworkChaosByNamespacedName(ctx, namespacedName)
        if err != nil {
-               return nil, false, err
-       }
-       if nc == nil {
-               return nil, false, nil
+               return nil, err
        }
-       return nc, true, nil
+       return nc, nil
 }
 
-func (r *ShardingSphereChaosReconciler) getPodChaosByNamespacedName(ctx 
context.Context, namespacedName types.NamespacedName) (reconcile.PodChaos, 
bool, error) {
+func (r *ShardingSphereChaosReconciler) getPodChaosByNamespacedName(ctx 
context.Context, namespacedName types.NamespacedName) (reconcile.PodChaos, 
error) {
        pc, err := r.Chaos.GetPodChaosByNamespacedName(ctx, namespacedName)
        if err != nil {
-               return nil, false, err
-       }
-       if pc == nil {
-               return nil, false, nil
+               return nil, err
        }
-       return pc, true, nil
+       return pc, nil
 }
 
-func (r *ShardingSphereChaosReconciler) getConfigMapByNamespacedName(ctx 
context.Context, namespacedName types.NamespacedName) (*v1.ConfigMap, bool, 
error) {
+func (r *ShardingSphereChaosReconciler) getConfigMapByNamespacedName(ctx 
context.Context, namespacedName types.NamespacedName) (*v1.ConfigMap, error) {
        config, err := r.ConfigMap.GetByNamespacedName(ctx, namespacedName)
        if err != nil {
-               return nil, false, err
-       }
-       if config == nil {
-               return nil, false, nil
+               return nil, err
        }
 
-       return config, true, nil
+       return config, nil
 }
 
-func (r *ShardingSphereChaosReconciler) getJobByNamespacedName(ctx 
context.Context, namespacedName types.NamespacedName) (*batchV1.Job, bool, 
error) {
+func (r *ShardingSphereChaosReconciler) getJobByNamespacedName(ctx 
context.Context, namespacedName types.NamespacedName) (*batchV1.Job, error) {
        injectJob, err := r.Job.GetByNamespacedName(ctx, namespacedName)
        if err != nil {
-               return nil, false, err
-       }
-       if injectJob == nil {
-               return nil, false, nil
+               return nil, err
        }
-
-       return injectJob, true, nil
+       return injectJob, nil
 }
 
 func (r *ShardingSphereChaosReconciler) updateConfigMap(ctx context.Context, 
chao *sschaosv1alpha1.ShardingSphereChaos, cur *v1.ConfigMap) error {
@@ -325,39 +539,87 @@ func (r *ShardingSphereChaosReconciler) 
CreateConfigMap(ctx context.Context, cha
 }
 
 func (r *ShardingSphereChaosReconciler) updateJob(ctx context.Context, 
requirement reconcile.InjectRequirement, chao 
*sschaosv1alpha1.ShardingSphereChaos, cur *batchV1.Job) error {
-       exp, err := reconcile.UpdateJob(chao, requirement, cur)
+       isEqual, err := reconcile.IsJobChanged(chao, requirement, cur)
        if err != nil {
                return err
        }
-       if exp != nil {
-               if err := r.Delete(ctx, cur); err != nil {
-                       return err
-               }
-               if err := ctrl.SetControllerReference(chao, exp, r.Scheme); err 
!= nil {
-                       return err
-               }
-               if err := r.Create(ctx, exp); err != nil {
+       if !isEqual {
+               if err := r.Delete(ctx, cur); err != nil && 
!apierrors.IsNotFound(err) {
                        return err
                }
+               r.Events.Event(chao, "Normal", "Updated", "job Updated")
        }
        return nil
 }
 
-// todo:
 func (r *ShardingSphereChaosReconciler) createJob(ctx context.Context, 
requirement reconcile.InjectRequirement, chao 
*sschaosv1alpha1.ShardingSphereChaos) error {
        injectJob, err := reconcile.NewJob(chao, requirement)
+       if err != nil {
+               return err
+       }
        if err := ctrl.SetControllerReference(chao, injectJob, r.Scheme); err 
!= nil {
                return err
        }
+
+       err = r.Create(ctx, injectJob)
        if err != nil {
+               return client.IgnoreAlreadyExists(err)
+       }
+
+       rJob := &batchV1.Job{}
+       backoff := wait.Backoff{
+               Steps:    6,
+               Duration: 500 * time.Millisecond,
+               Factor:   5.0,
+               Jitter:   0.1,
+       }
+
+       if err := retry.OnError(backoff, func(e error) bool {
+               return true
+       }, func() error {
+               return r.Get(ctx, types.NamespacedName{Namespace: 
chao.Namespace, Name: reconcile.SetJobNamespaceName(chao.Name, requirement)}, 
rJob)
+       }); err != nil {
                return err
        }
-       err = r.Create(ctx, injectJob)
-       if err == nil && apierrors.IsAlreadyExists(err) {
+
+       podList := &v1.PodList{}
+       if err := retry.OnError(backoff, func(e error) bool {
+               return e != nil
+       }, func() error {
+               if err := r.List(ctx, podList, 
client.MatchingLabels{"controller-uid": 
rJob.Spec.Template.Labels["controller-uid"]}); err != nil {
+                       return err
+               }
+               if len(podList.Items) == 0 {
+                       return ErrNoPod
+               }
                return nil
+       }); err != nil {
+               return err
        }
 
-       return err
+       for i := range podList.Items {
+               rPod := &podList.Items[i]
+               if err := ctrl.SetControllerReference(rJob, rPod, r.Scheme); 
err != nil {
+                       return err
+               }
+
+               exp := rPod.DeepCopy()
+               updateBackoff := wait.Backoff{
+                       Steps:    5,
+                       Duration: 30 * time.Millisecond,
+                       Factor:   5.0,
+                       Jitter:   0.1,
+               }
+               if err := retry.RetryOnConflict(updateBackoff, func() error {
+                       if err := r.Update(ctx, exp); err != nil {
+                               return err
+                       }
+                       return nil
+               }); err != nil {
+                       return err
+               }
+       }
+       return nil
 }
 
 func (r *ShardingSphereChaosReconciler) updatePodChaos(ctx context.Context, 
chao *sschaosv1alpha1.ShardingSphereChaos, podChaos reconcile.PodChaos) error {
@@ -417,6 +679,7 @@ func (r *ShardingSphereChaosReconciler) 
SetupWithManager(mgr ctrl.Manager) error
                For(&sschaosv1alpha1.ShardingSphereChaos{}).
                Owns(&chaosv1alpha1.PodChaos{}).
                Owns(&chaosv1alpha1.NetworkChaos{}).
+               Owns(&v1.ConfigMap{}).
                Owns(&batchV1.Job{}).
                Complete(r)
 }
diff --git 
a/shardingsphere-operator/pkg/reconcile/shardingspherechaos/configmap.go 
b/shardingsphere-operator/pkg/reconcile/shardingspherechaos/configmap.go
index f3054be..48d2f20 100644
--- a/shardingsphere-operator/pkg/reconcile/shardingspherechaos/configmap.go
+++ b/shardingsphere-operator/pkg/reconcile/shardingspherechaos/configmap.go
@@ -29,6 +29,7 @@ import (
 const (
        configExperimental = "experimental.sh"
        configPressure     = "pressure.sh"
+       configVerify       = "verify.sh"
 )
 
 const (
@@ -40,7 +41,7 @@ func NewSSConfigMap(chaos *v1alpha1.ShardingSphereChaos) 
*v1.ConfigMap {
 
        
cmb.SetName(chaos.Name).SetNamespace(chaos.Namespace).SetLabels(chaos.Labels)
 
-       
cmb.SetExperimental(chaos.Spec.InjectJob.Experimental).SetPressure(chaos.Spec.InjectJob.Pressure)
+       
cmb.SetExperimental(chaos.Spec.InjectJob.Experimental).SetPressure(chaos.Spec.InjectJob.Pressure).SetVerify(chaos.Spec.InjectJob.Verify)
 
        return cmb.Build()
 }
@@ -50,6 +51,7 @@ type SSConfigMapBuilder interface {
        common.ConfigMapBuilder
        SetExperimental(string) SSConfigMapBuilder
        SetPressure(string) SSConfigMapBuilder
+       SetVerify(string) SSConfigMapBuilder
 }
 
 type configmapBuilder struct {
@@ -75,6 +77,11 @@ func (c *configmapBuilder) SetPressure(cmd string) 
SSConfigMapBuilder {
        return c
 }
 
+func (c *configmapBuilder) SetVerify(cmd string) SSConfigMapBuilder {
+       c.configmap.Data[configVerify] = cmd
+       return c
+}
+
 // defaultConfigMap returns a ConfigMap filling with default expected values
 func defaultConfigMap() *v1.ConfigMap {
        return &v1.ConfigMap{
diff --git a/shardingsphere-operator/pkg/reconcile/shardingspherechaos/job.go 
b/shardingsphere-operator/pkg/reconcile/shardingspherechaos/job.go
index 9f0ac74..273a8a9 100644
--- a/shardingsphere-operator/pkg/reconcile/shardingspherechaos/job.go
+++ b/shardingsphere-operator/pkg/reconcile/shardingspherechaos/job.go
@@ -30,12 +30,16 @@ import (
 )
 
 const (
-       DefaultImageName     = "agoiyanzsa/tools-runtime:1.0"
+       DefaultImageName     = "agoiyanzsa/tools-runtime:2.0"
        DefaultContainerName = "tools-runtime"
        DefaultWorkPath      = "/app/start"
        DefaultConfigName    = "cmd-conf"
 )
 
+var (
+       DefaultTTLSecondsAfterFinished int32 = 300
+)
+
 var DefaultFileMode int32 = 493
 
 const (
@@ -52,11 +56,16 @@ type InjectRequirement string
 var (
        Experimental InjectRequirement = "experimental"
        Pressure     InjectRequirement = "pressure"
+       Verify       InjectRequirement = "verify"
 )
 
+func SetJobNamespaceName(name string, requirement InjectRequirement) string {
+       return fmt.Sprintf("%s-%s", name, string(requirement))
+}
+
 func NewJob(ssChaos *v1alpha1.ShardingSphereChaos, requirement 
InjectRequirement) (*v1.Job, error) {
        jbd := NewJobBuilder()
-       
jbd.SetNamespace(ssChaos.Namespace).SetLabels(ssChaos.Labels).SetName(ssChaos.Name)
+       
jbd.SetNamespace(ssChaos.Namespace).SetLabels(ssChaos.Labels).SetName(SetJobNamespaceName(ssChaos.Name,
 requirement))
 
        if v, ok := ssChaos.Annotations[completions]; ok {
                value, err := MustInt32(v)
@@ -117,11 +126,10 @@ func NewJob(ssChaos *v1alpha1.ShardingSphereChaos, 
requirement InjectRequirement
 
        vm := &corev1.VolumeMount{Name: DefaultConfigName, MountPath: 
DefaultWorkPath}
        cbd := common.NewContainerBuilder()
-       //todo: replace as DefaultImageName
        cbd.SetImage(DefaultImageName)
        cbd.SetName(DefaultContainerName)
        cbd.SetVolumeMount(vm)
-       cbd.SetCommand([]string{"sh"})
+       cbd.SetCommand([]string{"sh", "-c"})
        container := cbd.Build()
        container.Args = NewCmds(requirement)
        jbd.SetContainers(container)
@@ -131,12 +139,14 @@ func NewJob(ssChaos *v1alpha1.ShardingSphereChaos, 
requirement InjectRequirement
 
 func NewCmds(requirement InjectRequirement) []string {
        var cmds []string
-       cmds = append(cmds, "-c")
        if requirement == Experimental {
                cmds = append(cmds, fmt.Sprintf("%s/%s", DefaultWorkPath, 
configExperimental))
        }
        if requirement == Pressure {
-               cmds = append(cmds, fmt.Sprintf("%s/%s", DefaultWorkPath, 
configExperimental), fmt.Sprintf("%s/%s", DefaultWorkPath, configPressure))
+               cmds = append(cmds, fmt.Sprintf("%s/%s;%s/%s", DefaultWorkPath, 
configPressure, DefaultWorkPath, configExperimental))
+       }
+       if requirement == Verify {
+               cmds = append(cmds, fmt.Sprintf("%s/%s", DefaultWorkPath, 
configVerify))
        }
        return cmds
 }
@@ -149,17 +159,17 @@ func MustInt32(s string) (int32, error) {
        return int32(v), nil
 }
 
-func UpdateJob(ssChaos *v1alpha1.ShardingSphereChaos, requirement 
InjectRequirement, cur *v1.Job) (*v1.Job, error) {
+func IsJobChanged(ssChaos *v1alpha1.ShardingSphereChaos, requirement 
InjectRequirement, cur *v1.Job) (bool, error) {
        now, err := NewJob(ssChaos, requirement)
        if err != nil {
-               return nil, err
+               return false, err
        }
        isEqual := judgeJobEqual(cur, now)
        if isEqual {
-               return nil, nil
+               return true, nil
        }
 
-       return now, nil
+       return false, nil
 }
 
 func judgeJobEqual(now *v1.Job, exp *v1.Job) bool {
@@ -194,10 +204,15 @@ func judgeJobConfigEqual(now *v1.Job, exp *v1.Job) bool {
        return true
 }
 func judgeTTLSecondsAfterFinished(cur *int32, exp *int32) bool {
-       if exp != nil && *cur != *exp {
-               return false
+       if cur == nil && exp == nil {
+               return true
        }
-       return true
+       if cur != nil && exp != nil {
+               if *cur == *exp {
+                       return true
+               }
+       }
+       return false
 }
 func judgeActiveDeadlineSeconds(cur *int64, exp *int64) bool {
        if exp != nil && *cur != *exp {
@@ -212,6 +227,9 @@ func judgeContainerEqual(now *corev1.Container, exp 
*corev1.Container) bool {
        if !reflect.DeepEqual(now.Command, exp.Command) {
                return false
        }
+       if !reflect.DeepEqual(now.Args, exp.Args) {
+               return false
+       }
        if now.Image != exp.Image {
                return false
        }
@@ -300,7 +318,8 @@ func (j *jobBuilder) SetContainers(container 
*corev1.Container) JobBuilder {
 }
 
 func (j *jobBuilder) SetTTLSecondsAfterFinished(i int32) JobBuilder {
-       j.job.Spec.TTLSecondsAfterFinished = &i
+       ret := i
+       j.job.Spec.TTLSecondsAfterFinished = &ret
        return j
 }
 
diff --git 
a/shardingsphere-operator/pkg/reconcile/shardingspherechaos/shardingspherechaos_suite_test.go
 
b/shardingsphere-operator/pkg/reconcile/shardingspherechaos/shardingspherechaos_suite_test.go
index 33a6a42..cd10e4f 100644
--- 
a/shardingsphere-operator/pkg/reconcile/shardingspherechaos/shardingspherechaos_suite_test.go
+++ 
b/shardingsphere-operator/pkg/reconcile/shardingspherechaos/shardingspherechaos_suite_test.go
@@ -26,3 +26,10 @@ func TestShardingSphereChaos(t *testing.T) {
        RegisterFailHandler(Fail)
        RunSpecs(t, "ShardingSphereChaos Suite")
 }
+
+//func installChaosMesh() {
+//     var (
+//             installCmd = "curl -sSL 
https://mirrors.chaos-mesh.org/v2.5.1/install.sh | bash -s -- --docker-mirror"
+//     )
+//
+//}

Reply via email to