This is an automated email from the ASF dual-hosted git repository.

miaoliyao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/shardingsphere-on-cloud.git


The following commit(s) were added to refs/heads/main by this push:
     new d95b2cd  fix(chaos): add IT mocks and fix chaos bugs
     new 36e311a  Merge pull request #364 from moomman/add-ssChaos-test
d95b2cd is described below

commit d95b2cd714e7bd1e139021d1445c0fa8be801f3b
Author: moonman <[email protected]>
AuthorDate: Thu May 18 14:05:15 2023 +0800

    fix(chaos): add IT mocks and fix chaos bugs
---
 .../api/v1alpha1/shardingsphere_chaos_types.go     |   9 +-
 .../api/v1alpha1/zz_generated.deepcopy.go          |  13 +-
 .../cmd/shardingsphere-operator/manager/option.go  |  22 +-
 .../controllers/shardingsphere_chaos_controller.go | 264 +++++++++++-------
 .../shardingsphere_chaos_controller_test.go        | 298 +++++++++++++++++++++
 .../pkg/kubernetes/chaosmesh/builder.go            | 122 ++++-----
 .../pkg/kubernetes/chaosmesh/chaosmesh.go          |  22 +-
 .../pkg/kubernetes/chaosmesh/mocks/store.go        |   1 +
 .../pkg/kubernetes/configmap/builders.go           |   1 +
 shardingsphere-operator/pkg/pressure/pressure.go   |  72 ++---
 .../pkg/reconcile/shardingspherechaos/job.go       |   1 +
 shardingsphere-operator/test/e2e/e2e_suite_test.go |  33 +--
 .../e2e/shardingsphere_chaos_controller_test.go    | 138 +++++++---
 13 files changed, 717 insertions(+), 279 deletions(-)

diff --git a/shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go 
b/shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go
index 43431e4..f5321a5 100644
--- a/shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go
+++ b/shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go
@@ -59,7 +59,7 @@ type PressureCfg struct {
 
 type DistSQL struct {
        SQL  string   `json:"sql"`
-       Args []string `json:"args"`
+       Args []string `json:"args,omitempty"`
 }
 
 type Script string
@@ -94,9 +94,10 @@ const (
 
 // ShardingSphereChaosStatus defines the actual state of ShardingSphereChaos
 type ShardingSphereChaosStatus struct {
-       ChaosCondition ChaosCondition `json:"chaosCondition"`
-       Phase          ChaosPhase     `json:"phase"`
-       Result         Result         `json:"result"`
+       ChaosCondition ChaosCondition      `json:"chaosCondition"`
+       Phase          ChaosPhase          `json:"phase"`
+       Result         Result              `json:"result"`
+       Conditions     []*metav1.Condition `json:"condition,omitempty"`
 }
 
 // Result represents the result of the ShardingSphereChaos
diff --git a/shardingsphere-operator/api/v1alpha1/zz_generated.deepcopy.go 
b/shardingsphere-operator/api/v1alpha1/zz_generated.deepcopy.go
index f2e6b08..48efd64 100644
--- a/shardingsphere-operator/api/v1alpha1/zz_generated.deepcopy.go
+++ b/shardingsphere-operator/api/v1alpha1/zz_generated.deepcopy.go
@@ -1407,7 +1407,7 @@ func (in *ShardingSphereChaos) DeepCopyInto(out 
*ShardingSphereChaos) {
        out.TypeMeta = in.TypeMeta
        in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
        in.Spec.DeepCopyInto(&out.Spec)
-       out.Status = in.Status
+       in.Status.DeepCopyInto(&out.Status)
 }
 
 // DeepCopy is an autogenerated deepcopy function, copying the receiver, 
creating a new ShardingSphereChaos.
@@ -1482,6 +1482,17 @@ func (in *ShardingSphereChaosSpec) DeepCopy() 
*ShardingSphereChaosSpec {
 func (in *ShardingSphereChaosStatus) DeepCopyInto(out 
*ShardingSphereChaosStatus) {
        *out = *in
        out.Result = in.Result
+       if in.Conditions != nil {
+               in, out := &in.Conditions, &out.Conditions
+               *out = make([]*metav1.Condition, len(*in))
+               for i := range *in {
+                       if (*in)[i] != nil {
+                               in, out := &(*in)[i], &(*out)[i]
+                               *out = new(metav1.Condition)
+                               (*in).DeepCopyInto(*out)
+                       }
+               }
+       }
 }
 
 // DeepCopy is an autogenerated deepcopy function, copying the receiver, 
creating a new ShardingSphereChaosStatus.
diff --git 
a/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go 
b/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go
index ded3c83..d25f814 100644
--- a/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go
+++ b/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go
@@ -21,8 +21,6 @@ import (
        "flag"
        "strings"
 
-       
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/pressure"
-
        
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
        
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/controllers"
        sschaos 
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/chaosmesh"
@@ -94,7 +92,7 @@ func ParseOptionsFromCmdFlags() *Options {
        flag.BoolVar(&opt.LeaderElection, "leader-elect", false,
                "Enable leader election for controller manager. "+
                        "Enabling this will ensure there is only one active 
controller manager.")
-       flag.StringVar(&opt.FeatureGates, "feature-gates", 
"ShardingSphereChaos=true", "A set of key=value pairs that describe feature 
gates for alpha/experimental features.")
+       flag.StringVar(&opt.FeatureGates, "feature-gates", "", "A set of 
key=value pairs that describe feature gates for alpha/experimental features.")
        // aws client options
        flag.StringVar(&AwsAccessKeyID, "aws-access-key-id", "", "The AWS 
access key ID.")
        flag.StringVar(&AwsSecretAccessKey, "aws-secret-key", "", "The AWS 
secret access key.")
@@ -171,15 +169,15 @@ var featureGatesHandlers = map[string]FeatureGateHandler{
                        return err
                }
                if err := (&controllers.ShardingSphereChaosReconciler{
-                       Client:       mgr.GetClient(),
-                       Scheme:       mgr.GetScheme(),
-                       Log:          mgr.GetLogger(),
-                       Chaos:        sschaos.NewChaos(mgr.GetClient()),
-                       Job:          job.NewJob(mgr.GetClient()),
-                       ExecRecorder: make([]*pressure.Pressure, 0),
-                       ConfigMap:    
configmap.NewConfigMapClient(mgr.GetClient()),
-                       Events:       
mgr.GetEventRecorderFor("shardingsphere-chaos-controller"),
-                       ClientSet:    clientset,
+                       Client:    mgr.GetClient(),
+                       Scheme:    mgr.GetScheme(),
+                       Log:       mgr.GetLogger(),
+                       Chaos:     sschaos.NewChaos(mgr.GetClient()),
+                       Job:       job.NewJob(mgr.GetClient()),
+                       ExecCtrls: make([]*controllers.ExecCtrl, 0),
+                       ConfigMap: 
configmap.NewConfigMapClient(mgr.GetClient()),
+                       Events:    
mgr.GetEventRecorderFor("shardingsphere-chaos-controller"),
+                       ClientSet: clientset,
                }).SetupWithManager(mgr); err != nil {
                        logger.Error(err, "unable to create controller", 
"controller", "ShardingSphereChaos")
                        return err
diff --git 
a/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller.go 
b/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller.go
index 089abfe..c512ccf 100644
--- a/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller.go
+++ b/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller.go
@@ -19,29 +19,29 @@ package controllers
 
 import (
        "context"
-       "errors"
        "fmt"
-
-       
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/pressure"
-
-       "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
+       "reflect"
+       "time"
 
        
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
-       sschaos 
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/chaosmesh"
+       
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/chaosmesh"
        
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/configmap"
        
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/job"
-       reconcile 
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/reconcile/shardingspherechaos"
+       
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/pressure"
+       sschaos 
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/reconcile/shardingspherechaos"
 
        "github.com/go-logr/logr"
        batchV1 "k8s.io/api/batch/v1"
        corev1 "k8s.io/api/core/v1"
        apierrors "k8s.io/apimachinery/pkg/api/errors"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
        "k8s.io/apimachinery/pkg/runtime"
        "k8s.io/apimachinery/pkg/types"
        clientset "k8s.io/client-go/kubernetes"
        "k8s.io/client-go/tools/record"
        ctrl "sigs.k8s.io/controller-runtime"
        "sigs.k8s.io/controller-runtime/pkg/client"
+       "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
 )
 
 const (
@@ -49,17 +49,6 @@ const (
        SSChaosFinalizeName               = 
"shardingsphere.apache.org/finalizer"
 )
 
-type JobCondition string
-
-var (
-       CompleteJob JobCondition = "complete"
-       FailureJob  JobCondition = "failure"
-       SuspendJob  JobCondition = "suspend"
-       ActiveJob   JobCondition = "active"
-
-       ErrNoPod = errors.New("no pod in list")
-)
-
 // ShardingSphereChaosReconciler is a controller for the ShardingSphereChaos
 type ShardingSphereChaosReconciler struct {
        client.Client
@@ -69,10 +58,16 @@ type ShardingSphereChaosReconciler struct {
        Events    record.EventRecorder
        ClientSet *clientset.Clientset
 
-       Chaos        sschaos.Chaos
-       Job          job.Job
-       ExecRecorder []*pressure.Pressure
-       ConfigMap    configmap.ConfigMap
+       Chaos     chaosmesh.Chaos
+       Job       job.Job
+       ExecCtrls []*ExecCtrl
+       ConfigMap configmap.ConfigMap
+}
+
+type ExecCtrl struct {
+       cancel   context.CancelFunc
+       pressure *pressure.Pressure
+       ctx      context.Context
 }
 
 // Reconcile handles main function of this controller
@@ -82,48 +77,43 @@ func (r *ShardingSphereChaosReconciler) Reconcile(ctx 
context.Context, req ctrl.
        ssChaos, err := r.getRuntimeChaos(ctx, req.NamespacedName)
 
        if err != nil {
-               if apierrors.IsNotFound(err) {
-                       return ctrl.Result{RequeueAfter: defaultRequeueTime}, 
nil
-               }
-
-               logger.Error(err, "failed to get the shardingsphere chaos")
-               return ctrl.Result{Requeue: true}, err
+               return ctrl.Result{}, client.IgnoreNotFound(err)
        }
 
-       if err := r.finalize(ctx, ssChaos); err != nil {
-               return ctrl.Result{Requeue: true}, err
+       logger.Info("start reconcile shardingspherechaos")
+       if ssChaos.ObjectMeta.DeletionTimestamp.IsZero() {
+               if !controllerutil.ContainsFinalizer(ssChaos, 
SSChaosFinalizeName) {
+                       controllerutil.AddFinalizer(ssChaos, 
SSChaosFinalizeName)
+                       if err := r.Update(ctx, ssChaos); err != nil {
+                               return ctrl.Result{}, err
+                       }
+               }
+       } else if controllerutil.ContainsFinalizer(ssChaos, 
SSChaosFinalizeName) {
+               return r.finalize(ctx, ssChaos)
        }
 
-       logger.Info("start reconcile chaos")
-
-       //TODO: consider merge these events
        var errors []error
        if err := r.reconcileChaos(ctx, ssChaos); err != nil {
-               if err != nil {
-                       errors = append(errors, err)
-               }
+               errors = append(errors, err)
+
                logger.Error(err, "reconcile shardingspherechaos error")
                r.Events.Event(ssChaos, "Warning", "shardingspherechaos error", 
err.Error())
        }
 
        if err := r.reconcileConfigMap(ctx, ssChaos); err != nil {
-               if err != nil {
-                       errors = append(errors, err)
-               }
+               errors = append(errors, err)
+
                logger.Error(err, "reconcile configmap error")
                r.Events.Event(ssChaos, "Warning", "configmap error", 
err.Error())
        }
 
        if err := r.reconcilePressure(ctx, ssChaos); err != nil {
-               if err != nil {
-                       errors = append(errors, err)
-               }
+               errors = append(errors, err)
        }
 
        if err := r.reconcileStatus(ctx, ssChaos); err != nil {
-               if err != nil {
-                       errors = append(errors, err)
-               }
+               errors = append(errors, err)
+
                logger.Error(err, "failed to update status")
        }
        if len(errors) > 0 {
@@ -133,42 +123,73 @@ func (r *ShardingSphereChaosReconciler) Reconcile(ctx 
context.Context, req ctrl.
 }
 
 func (r *ShardingSphereChaosReconciler) reconcilePressure(ctx context.Context, 
chao *v1alpha1.ShardingSphereChaos) error {
+
+       if chao.Status.Phase == "" {
+               return nil
+       }
        exec := r.getNeedExec(chao)
 
        //if exec in this phase do not exist,create it
        if exec == nil {
                exec := pressure.NewPressure(getExecName(chao), 
chao.Spec.PressureCfg.DistSQLs)
-               go exec.Run(ctx, &chao.Spec.PressureCfg)
-               r.ExecRecorder = append(r.ExecRecorder, exec)
+
+               //we need to set active to true,prevent it start after we start 
reconcile status
+               exec.Active = true
+               execCtx, cancel := context.WithCancel(ctx)
+               execCtrl := &ExecCtrl{
+                       cancel:   cancel,
+                       pressure: exec,
+                       ctx:      execCtx,
+               }
+
+               go exec.Run(execCtx, &chao.Spec.PressureCfg)
+               r.ExecCtrls = append(r.ExecCtrls, execCtrl)
        }
 
        return nil
 }
 
 func (r *ShardingSphereChaosReconciler) reconcileStatus(ctx context.Context, 
chaos *v1alpha1.ShardingSphereChaos) error {
-       namespacedName := types.NamespacedName{
-               Name:      chaos.Name,
-               Namespace: chaos.Namespace,
-       }
+
+       cur := chaos.Status.DeepCopy()
 
        setDefaultStatus(chaos)
+
+       updateCondition(chaos)
+
        r.updatePhaseExec(chaos)
 
        if err := r.updateChaosCondition(ctx, chaos); err != nil {
                return err
        }
 
-       rt, err := r.getRuntimeChaos(ctx, namespacedName)
-       if err != nil {
-               return err
+       if reflect.DeepEqual(cur, chaos.Status) {
+
+               return nil
        }
-       rt.Status = chaos.Status
 
-       return r.Status().Update(ctx, rt)
+       return r.Status().Update(ctx, chaos)
+}
+
+func updateCondition(chaos *v1alpha1.ShardingSphereChaos) {
+       phase := chaos.Status.Phase
+
+       for i := range chaos.Status.Conditions {
+               condition := chaos.Status.Conditions[i]
+               if string(phase) == condition.Type {
+                       if condition.Status == v1alpha1.ConditionStatusFalse {
+                               condition.Status = v1alpha1.ConditionStatusTrue
+                               condition.LastTransitionTime = 
metav1.Time{Time: time.Now()}
+                       }
+                       return
+               }
+       }
 }
 
 func (r *ShardingSphereChaosReconciler) updatePhaseExec(chaos 
*v1alpha1.ShardingSphereChaos) {
        exec := r.getNeedExec(chaos)
+
+       //because the goroutine asynchronous,we cant check it start immediately 
or not
        if exec == nil || exec.Active {
                return
        }
@@ -176,28 +197,36 @@ func (r *ShardingSphereChaosReconciler) 
updatePhaseExec(chaos *v1alpha1.Sharding
        //todo: judge error
 
        msg := generateMsgFromExec(exec)
+       var nextPhase v1alpha1.ChaosPhase
        //when exec finished, update phase
        switch chaos.Status.Phase {
        case v1alpha1.BeforeSteady:
+               nextPhase = v1alpha1.AfterSteady
+       case v1alpha1.AfterSteady:
                chaos.Status.Result.Steady = *msg
-               chaos.Status.Phase = v1alpha1.BeforeChaos
+               //todo: add metrics
+
+               nextPhase = v1alpha1.BeforeChaos
        case v1alpha1.BeforeChaos:
                chaos.Status.Result.Chaos = *msg
-               chaos.Status.Phase = v1alpha1.AfterChaos
+               //todo: add metrics
+
+               nextPhase = v1alpha1.AfterChaos
+       //case v1alpha1.AfterChaos:
+       //      //todo: check result here
+       //      return
+       default:
+               return
        }
 
+       chaos.Status.Phase = nextPhase
 }
 
 func generateMsgFromExec(exec *pressure.Pressure) *v1alpha1.Msg {
        //todo: wait to change result compute way
-       rate := 0
-       if exec.Result.Total == 0 {
-               rate = 0
-       } else {
-               rate = exec.Result.Success / exec.Result.Total
-       }
+
        msg := v1alpha1.Msg{
-               Result:   fmt.Sprintf("%d", rate),
+               Result:   fmt.Sprintf("%d/%d", exec.Result.Success, 
exec.Result.Total),
                Duration: exec.Result.Duration.String(),
        }
        if exec.Err != nil {
@@ -209,23 +238,29 @@ func generateMsgFromExec(exec *pressure.Pressure) 
*v1alpha1.Msg {
 
 func getExecName(chao *v1alpha1.ShardingSphereChaos) string {
        var execName string
+       nameSpacedName := types.NamespacedName{Namespace: chao.Namespace, Name: 
chao.Name}
+
        if chao.Status.Phase == v1alpha1.BeforeSteady || chao.Status.Phase == 
v1alpha1.AfterSteady {
-               execName = reconcile.MakeJobName(chao.Name, reconcile.InSteady)
+               execName = makeExecName(nameSpacedName, 
string(sschaos.InSteady))
        }
        if chao.Status.Phase == v1alpha1.BeforeChaos || chao.Status.Phase == 
v1alpha1.AfterChaos {
-               execName = reconcile.MakeJobName(chao.Name, reconcile.InChaos)
+               execName = makeExecName(nameSpacedName, string(sschaos.InChaos))
        }
 
        return execName
 }
 
+func makeExecName(namespacedName types.NamespacedName, execType string) string 
{
+       return fmt.Sprintf("%s-%s-%s", namespacedName.Namespace, 
namespacedName.Name, execType)
+}
+
 func (r *ShardingSphereChaosReconciler) getNeedExec(chao 
*v1alpha1.ShardingSphereChaos) *pressure.Pressure {
        jobName := getExecName(chao)
 
        //if pressure do not exist,run it
-       for i := range r.ExecRecorder {
-               if r.ExecRecorder[i].Name == jobName {
-                       return r.ExecRecorder[i]
+       for i := range r.ExecCtrls {
+               if r.ExecCtrls[i].pressure.Name == jobName {
+                       return r.ExecCtrls[i].pressure
                }
        }
 
@@ -239,26 +274,22 @@ func (r *ShardingSphereChaosReconciler) 
getRuntimeChaos(ctx context.Context, nam
 }
 
 // nolint:nestif
-func (r *ShardingSphereChaosReconciler) finalize(ctx context.Context, chao 
*v1alpha1.ShardingSphereChaos) error {
-       if chao.ObjectMeta.DeletionTimestamp.IsZero() {
-               if !controllerutil.ContainsFinalizer(chao, SSChaosFinalizeName) 
{
-                       controllerutil.AddFinalizer(chao, SSChaosFinalizeName)
-                       if err := r.Update(ctx, chao); err != nil {
-                               return err
-                       }
-               }
-       } else if controllerutil.ContainsFinalizer(chao, SSChaosFinalizeName) {
-               if err := r.deleteExternalResources(ctx, chao); err != nil {
-                       return err
-               }
+func (r *ShardingSphereChaosReconciler) finalize(ctx context.Context, ssChaos 
*v1alpha1.ShardingSphereChaos) (ctrl.Result, error) {
+       namespacedName := types.NamespacedName{
+               Namespace: ssChaos.Namespace,
+               Name:      ssChaos.Name,
+       }
+       r.deleteExec(namespacedName)
+       if err := r.deleteExternalResources(ctx, ssChaos); err != nil {
+               return ctrl.Result{}, err
+       }
 
-               controllerutil.RemoveFinalizer(chao, SSChaosFinalizeName)
-               if err := r.Update(ctx, chao); err != nil {
-                       return err
-               }
+       controllerutil.RemoveFinalizer(ssChaos, SSChaosFinalizeName)
+       if err := r.Update(ctx, ssChaos); err != nil {
+               return ctrl.Result{}, err
        }
 
-       return nil
+       return ctrl.Result{}, nil
 }
 
 func (r *ShardingSphereChaosReconciler) deleteExternalResources(ctx 
context.Context, chao *v1alpha1.ShardingSphereChaos) error {
@@ -282,6 +313,20 @@ func (r *ShardingSphereChaosReconciler) 
deleteExternalResources(ctx context.Cont
        return nil
 }
 
+func (r *ShardingSphereChaosReconciler) deleteExec(namespacedName 
types.NamespacedName) {
+       steady, chaos := makeExecName(namespacedName, 
string(sschaos.InSteady)), makeExecName(namespacedName, string(sschaos.InChaos))
+       execR := make([]*ExecCtrl, 0, len(r.ExecCtrls))
+       for i := range r.ExecCtrls {
+               exec := r.ExecCtrls[i].pressure
+               if exec.Name == steady || exec.Name == chaos {
+                       r.ExecCtrls[i].cancel()
+                       continue
+               }
+               execR = append(execR, r.ExecCtrls[i])
+       }
+       r.ExecCtrls = execR
+}
+
 func (r *ShardingSphereChaosReconciler) deletePodChaos(ctx context.Context, 
namespacedName types.NamespacedName) error {
        podchao, err := r.getPodChaosByNamespacedName(ctx, namespacedName)
        if err != nil {
@@ -316,7 +361,6 @@ func (r *ShardingSphereChaosReconciler) reconcileChaos(ctx 
context.Context, chao
        if chaos.Status.Phase == "" || chaos.Status.Phase == 
v1alpha1.BeforeSteady || chaos.Status.Phase == v1alpha1.AfterSteady {
                return nil
        }
-
        namespacedName := types.NamespacedName{
                Namespace: chaos.Namespace,
                Name:      chaos.Name,
@@ -351,7 +395,7 @@ func (r *ShardingSphereChaosReconciler) 
reconcilePodChaos(ctx context.Context, c
        return r.createPodChaos(ctx, chaos)
 }
 
-func (r *ShardingSphereChaosReconciler) getPodChaosByNamespacedName(ctx 
context.Context, namespacedName types.NamespacedName) (sschaos.PodChaos, error) 
{
+func (r *ShardingSphereChaosReconciler) getPodChaosByNamespacedName(ctx 
context.Context, namespacedName types.NamespacedName) (chaosmesh.PodChaos, 
error) {
        pc, err := r.Chaos.GetPodChaosByNamespacedName(ctx, namespacedName)
        if err != nil {
                return nil, err
@@ -368,7 +412,7 @@ func (r *ShardingSphereChaosReconciler) createPodChaos(ctx 
context.Context, chao
        return nil
 }
 
-func (r *ShardingSphereChaosReconciler) updatePodChaos(ctx context.Context, 
chaos *v1alpha1.ShardingSphereChaos, podChaos sschaos.PodChaos) error {
+func (r *ShardingSphereChaosReconciler) updatePodChaos(ctx context.Context, 
chaos *v1alpha1.ShardingSphereChaos, podChaos chaosmesh.PodChaos) error {
        err := r.Chaos.UpdatePodChaos(ctx, podChaos, chaos)
        if err != nil {
                return err
@@ -389,7 +433,7 @@ func (r *ShardingSphereChaosReconciler) 
reconcileNetworkChaos(ctx context.Contex
        return r.createNetworkChaos(ctx, chaos)
 }
 
-func (r *ShardingSphereChaosReconciler) updateNetWorkChaos(ctx 
context.Context, chaos *v1alpha1.ShardingSphereChaos, networkChaos 
sschaos.NetworkChaos) error {
+func (r *ShardingSphereChaosReconciler) updateNetWorkChaos(ctx 
context.Context, chaos *v1alpha1.ShardingSphereChaos, networkChaos 
chaosmesh.NetworkChaos) error {
        err := r.Chaos.UpdateNetworkChaos(ctx, networkChaos, chaos)
        if err != nil {
                return err
@@ -435,9 +479,39 @@ func (r *ShardingSphereChaosReconciler) 
reconcileConfigMap(ctx context.Context,
 }
 
 func setDefaultStatus(chaos *v1alpha1.ShardingSphereChaos) {
+
        if chaos.Status.Phase == "" {
                chaos.Status.Phase = v1alpha1.BeforeSteady
        }
+
+       if len(chaos.Status.Conditions) == 0 {
+               chaos.Status.Conditions = []*metav1.Condition{
+                       {
+                               Type:               
string(v1alpha1.BeforeSteady),
+                               Status:             metav1.ConditionTrue,
+                               LastTransitionTime: metav1.Time{Time: 
time.Now()},
+                               Reason:             "InSteadyExperiment",
+                       },
+                       {
+                               Type:               
string(v1alpha1.AfterSteady),
+                               Status:             metav1.ConditionFalse,
+                               LastTransitionTime: metav1.Time{Time: 
time.Now()},
+                               Reason:             "AfterSteadyExperiment",
+                       },
+                       {
+                               Type:               
string(v1alpha1.BeforeChaos),
+                               Status:             metav1.ConditionFalse,
+                               LastTransitionTime: metav1.Time{Time: 
time.Now()},
+                               Reason:             "InChaoExperiment",
+                       },
+                       {
+                               Type:               string(v1alpha1.AfterChaos),
+                               Status:             metav1.ConditionFalse,
+                               LastTransitionTime: metav1.Time{Time: 
time.Now()},
+                               Reason:             "AfterChaosExperiment",
+                       },
+               }
+       }
 }
 
 func (r *ShardingSphereChaosReconciler) updateChaosCondition(ctx 
context.Context, chaos *v1alpha1.ShardingSphereChaos) error {
@@ -451,7 +525,7 @@ func (r *ShardingSphereChaosReconciler) 
updateChaosCondition(ctx context.Context
                if err != nil {
                        return err
                }
-               chaos.Status.ChaosCondition = sschaos.ConvertChaosStatus(ctx, 
chaos, pc)
+               chaos.Status.ChaosCondition = chaosmesh.ConvertChaosStatus(ctx, 
chaos, pc)
        }
 
        if chaos.Spec.EmbedChaos.NetworkChaos != nil {
@@ -459,13 +533,13 @@ func (r *ShardingSphereChaosReconciler) 
updateChaosCondition(ctx context.Context
                if err != nil {
                        return err
                }
-               chaos.Status.ChaosCondition = sschaos.ConvertChaosStatus(ctx, 
chaos, nc)
+               chaos.Status.ChaosCondition = chaosmesh.ConvertChaosStatus(ctx, 
chaos, nc)
        }
 
        return nil
 }
 
-func (r *ShardingSphereChaosReconciler) getNetworkChaosByNamespacedName(ctx 
context.Context, namespacedName types.NamespacedName) (sschaos.NetworkChaos, 
error) {
+func (r *ShardingSphereChaosReconciler) getNetworkChaosByNamespacedName(ctx 
context.Context, namespacedName types.NamespacedName) (chaosmesh.NetworkChaos, 
error) {
        nc, err := r.Chaos.GetNetworkChaosByNamespacedName(ctx, namespacedName)
        if err != nil {
                return nil, err
@@ -483,7 +557,7 @@ func (r *ShardingSphereChaosReconciler) 
getConfigMapByNamespacedName(ctx context
 }
 
 func (r *ShardingSphereChaosReconciler) updateConfigMap(ctx context.Context, 
chaos *v1alpha1.ShardingSphereChaos, cur *corev1.ConfigMap) error {
-       // exp := reconcile.UpdateShardingSphereChaosConfigMap(chao, cur)
+       // exp := sschaos.UpdateShardingSphereChaosConfigMap(chao, cur)
        exp := r.ConfigMap.Build(ctx, chaos)
        exp.ObjectMeta = cur.ObjectMeta
        exp.ObjectMeta.ResourceVersion = ""
diff --git 
a/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller_test.go
 
b/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller_test.go
new file mode 100644
index 0000000..0fe1ef3
--- /dev/null
+++ 
b/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller_test.go
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package controllers
+
+import (
+       "context"
+       "database/sql"
+       "regexp"
+       "time"
+
+       
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
+       mockChaos 
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/chaosmesh/mocks"
+       
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/configmap"
+       
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/pressure"
+       reconcile 
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/reconcile/shardingspherechaos"
+
+       "bou.ke/monkey"
+       "github.com/DATA-DOG/go-sqlmock"
+       "github.com/golang/mock/gomock"
+       . "github.com/onsi/ginkgo/v2"
+       . "github.com/onsi/gomega"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/apimachinery/pkg/runtime"
+       "k8s.io/apimachinery/pkg/types"
+       clientgoscheme "k8s.io/client-go/kubernetes/scheme"
+       "k8s.io/client-go/tools/record"
+       ctrl "sigs.k8s.io/controller-runtime"
+       "sigs.k8s.io/controller-runtime/pkg/client"
+       "sigs.k8s.io/controller-runtime/pkg/client/fake"
+       logf "sigs.k8s.io/controller-runtime/pkg/log"
+)
+
+func mockchaosStub(chaos *mockChaos.MockChaos) {
+       chaos.EXPECT().NewNetworkChaos(gomock.Any(), 
gomock.Any()).Return(gomock.Any()).AnyTimes()
+       chaos.EXPECT().NewPodChaos(gomock.Any(), 
gomock.Any()).Return(gomock.Any()).AnyTimes()
+
+       chaos.EXPECT().CreatePodChaos(gomock.Any(), 
gomock.Any()).Return(nil).AnyTimes()
+       chaos.EXPECT().CreateNetworkChaos(gomock.Any(), 
gomock.Any()).Return(nil).AnyTimes()
+
+       chaos.EXPECT().DeleteNetworkChaos(gomock.Any(), 
gomock.Any()).Return(nil).AnyTimes()
+       chaos.EXPECT().DeletePodChaos(gomock.Any(), 
gomock.Any()).Return(nil).AnyTimes()
+
+       chaos.EXPECT().UpdatePodChaos(gomock.Any(), gomock.Any(), 
gomock.Any()).Return(nil).AnyTimes()
+       chaos.EXPECT().UpdateNetworkChaos(gomock.Any(), gomock.Any(), 
gomock.Any()).Return(nil).AnyTimes()
+
+       chaos.EXPECT().GetNetworkChaosByNamespacedName(gomock.Any(), 
gomock.Any()).Return(gomock.Any(), nil).AnyTimes()
+       chaos.EXPECT().GetPodChaosByNamespacedName(gomock.Any(), 
gomock.Any()).Return(gomock.Any(), nil).AnyTimes()
+}
+
+func mockDBStub(mock sqlmock.Sqlmock) {
+       mock.ExpectExec(regexp.QuoteMeta("REGISTER STORAGE 
UNIT")).WillReturnResult(sqlmock.NewResult(1, 1))
+}
+
+var _ = Describe("shardingsphere mock test", func() {
+       var (
+               testNamespacedName = types.NamespacedName{
+                       Namespace: "test-ssChaos-namespace",
+                       Name:      "test-ssChaos-name",
+               }
+               duration = "30s"
+       )
+       var (
+               ctx        = context.TODO()
+               fakeClient client.Client
+               reconciler *ShardingSphereChaosReconciler
+               mockCtrl   *gomock.Controller
+               mockchaos  *mockChaos.MockChaos
+               db         *sql.DB
+       )
+
+       BeforeEach(func() {
+               scheme := runtime.NewScheme()
+               Expect(clientgoscheme.AddToScheme(scheme)).To(Succeed())
+               Expect(v1alpha1.AddToScheme(scheme)).To(Succeed())
+               fakeClient = fake.NewClientBuilder().WithScheme(scheme).Build()
+               mockCtrl = gomock.NewController(GinkgoT())
+
+               mockchaos = mockChaos.NewMockChaos(mockCtrl)
+               mockchaosStub(mockchaos)
+
+               reconciler = &ShardingSphereChaosReconciler{
+                       Client:    fakeClient,
+                       Scheme:    scheme,
+                       Log:       logf.Log,
+                       Events:    record.NewFakeRecorder(100),
+                       Chaos:     mockchaos,
+                       ExecCtrls: make([]*ExecCtrl, 0),
+                       ConfigMap: configmap.NewConfigMapClient(fakeClient),
+               }
+
+               var (
+                       dbmock sqlmock.Sqlmock
+                       err    error
+               )
+
+               db, dbmock, err = sqlmock.New()
+               Expect(err).To(BeNil())
+               Expect(db).NotTo(BeNil())
+               Expect(dbmock).NotTo(BeNil())
+
+               monkey.Patch(sql.Open, func(driverName, dataSourceName string) 
(*sql.DB, error) {
+                       return db, nil
+               })
+
+               mockDBStub(dbmock)
+       })
+
+       AfterEach(func() {
+               monkey.UnpatchAll()
+               db.Close()
+       })
+
+       Context("create shardingsphere chaos", func() {
+               It("should create successfully", func() {
+                       ssChaos := &v1alpha1.ShardingSphereChaos{
+                               ObjectMeta: metav1.ObjectMeta{
+                                       Name:      testNamespacedName.Name,
+                                       Namespace: testNamespacedName.Namespace,
+                               },
+                               Spec: v1alpha1.ShardingSphereChaosSpec{
+                                       EmbedChaos: v1alpha1.EmbedChaos{
+                                               PodChaos: 
&v1alpha1.PodChaosSpec{
+                                                       PodSelector: 
v1alpha1.PodSelector{
+                                                               LabelSelectors: 
map[string]string{
+                                                                       
"app.kubernetes.io/component": "zookeeper",
+                                                               },
+                                                       },
+                                                       Action: 
v1alpha1.PodFailure,
+                                                       Params: 
v1alpha1.PodChaosParams{
+                                                               PodFailure: 
&v1alpha1.PodFailureParams{
+                                                                       
Duration: &duration,
+                                                               },
+                                                       },
+                                               },
+                                       },
+                                       PressureCfg: v1alpha1.PressureCfg{
+                                               SsHost:   "127.0.0.1:3306/ds_1",
+                                               Duration: 
metav1.Duration{Duration: 30 * time.Second},
+                                               ReqTime:  
metav1.Duration{Duration: 30 * time.Second},
+                                               DistSQLs: []v1alpha1.DistSQL{
+                                                       {
+                                                               SQL:  "REGISTER 
STORAGE UNIT ?()",
+                                                               Args: 
[]string{"ds_1"},
+                                                       },
+                                               },
+                                               ConcurrentNum: 2,
+                                               ReqNum:        5,
+                                       },
+                               },
+                               Status: v1alpha1.ShardingSphereChaosStatus{},
+                       }
+
+                       Expect(fakeClient.Create(ctx, 
ssChaos)).Should(Succeed())
+                       chaos := &v1alpha1.ShardingSphereChaos{}
+                       _, err := reconciler.Reconcile(ctx, 
ctrl.Request{NamespacedName: testNamespacedName})
+                       Expect(err).To(BeNil())
+                       Expect(fakeClient.Get(ctx, testNamespacedName, 
chaos)).Should(Succeed())
+                       Expect(fakeClient.Delete(ctx, chaos)).Should(Succeed())
+               })
+       })
+
+       Context("reconcile ssChaos in BeforeSteady phase", func() {
+               It("chaos should be nil,execRecorder should be steady", func() {
+                       ssChaos := &v1alpha1.ShardingSphereChaos{
+                               ObjectMeta: metav1.ObjectMeta{
+                                       Name:      testNamespacedName.Name,
+                                       Namespace: testNamespacedName.Namespace,
+                               },
+                               Spec: v1alpha1.ShardingSphereChaosSpec{
+                                       EmbedChaos: v1alpha1.EmbedChaos{
+                                               PodChaos: 
&v1alpha1.PodChaosSpec{
+                                                       PodSelector: 
v1alpha1.PodSelector{
+                                                               LabelSelectors: 
map[string]string{
+                                                                       
"app.kubernetes.io/component": "zookeeper",
+                                                               },
+                                                       },
+                                                       Action: 
v1alpha1.PodFailure,
+                                                       Params: 
v1alpha1.PodChaosParams{
+                                                               PodFailure: 
&v1alpha1.PodFailureParams{
+                                                                       
Duration: &duration,
+                                                               },
+                                                       },
+                                               },
+                                       },
+                                       PressureCfg: v1alpha1.PressureCfg{
+                                               SsHost:   "127.0.0.1:3306/ds_1",
+                                               Duration: 
metav1.Duration{Duration: 30 * time.Second},
+                                               ReqTime:  
metav1.Duration{Duration: 30 * time.Second},
+                                               DistSQLs: []v1alpha1.DistSQL{
+                                                       {
+                                                               SQL:  "REGISTER 
STORAGE UNIT ?()",
+                                                               Args: 
[]string{"ds_1"},
+                                                       },
+                                               },
+                                               ConcurrentNum: 2,
+                                               ReqNum:        5,
+                                       },
+                               },
+                               Status: v1alpha1.ShardingSphereChaosStatus{},
+                       }
+
+                       Expect(fakeClient.Create(ctx, 
ssChaos)).Should(Succeed())
+                       for i := 0; i < 5; i++ {
+                               _, err := reconciler.Reconcile(ctx, 
ctrl.Request{NamespacedName: testNamespacedName})
+                               Expect(err).To(BeNil())
+                       }
+
+                       var inSteadyChaos v1alpha1.ShardingSphereChaos
+                       Expect(fakeClient.Get(ctx, testNamespacedName, 
&inSteadyChaos)).Should(Succeed())
+                       
Expect(inSteadyChaos.Status.Phase).To(Equal(v1alpha1.BeforeSteady))
+
+                       Expect(len(reconciler.ExecCtrls)).To(Equal(1))
+                       Expect(fakeClient.Delete(ctx, 
&inSteadyChaos)).Should(Succeed())
+               })
+       })
+
+       Context("reconcile ssChaos in BeforeChaos", func() {
+               It("phase should in beforeChaos,execRecorder should gt 2", 
func() {
+                       ssChaos := &v1alpha1.ShardingSphereChaos{
+                               ObjectMeta: metav1.ObjectMeta{
+                                       Name:      testNamespacedName.Name,
+                                       Namespace: testNamespacedName.Namespace,
+                               },
+                               Spec: v1alpha1.ShardingSphereChaosSpec{
+                                       EmbedChaos: v1alpha1.EmbedChaos{
+                                               PodChaos: 
&v1alpha1.PodChaosSpec{
+                                                       PodSelector: 
v1alpha1.PodSelector{
+                                                               LabelSelectors: 
map[string]string{
+                                                                       
"app.kubernetes.io/component": "zookeeper",
+                                                               },
+                                                       },
+                                                       Action: 
v1alpha1.PodFailure,
+                                                       Params: 
v1alpha1.PodChaosParams{
+                                                               PodFailure: 
&v1alpha1.PodFailureParams{
+                                                                       
Duration: &duration,
+                                                               },
+                                                       },
+                                               },
+                                       },
+                                       PressureCfg: v1alpha1.PressureCfg{
+                                               SsHost:   "127.0.0.1:3306/ds_1",
+                                               Duration: 
metav1.Duration{Duration: 30 * time.Second},
+                                               ReqTime:  
metav1.Duration{Duration: 30 * time.Second},
+                                               DistSQLs: []v1alpha1.DistSQL{
+                                                       {
+                                                               SQL:  "REGISTER 
STORAGE UNIT ?()",
+                                                               Args: 
[]string{"ds_1"},
+                                                       },
+                                               },
+                                               ConcurrentNum: 2,
+                                               ReqNum:        5,
+                                       },
+                               },
+                               Status: v1alpha1.ShardingSphereChaosStatus{
+                                       Phase: v1alpha1.BeforeChaos,
+                               },
+                       }
+
+                       Expect(fakeClient.Create(ctx, 
ssChaos)).Should(Succeed())
+                       var chao v1alpha1.ShardingSphereChaos
+                       Expect(fakeClient.Get(ctx, testNamespacedName, 
&chao)).Should(Succeed())
+                       steadyExec := 
pressure.NewPressure(reconcile.MakeJobName(ssChaos.Name, reconcile.InSteady), 
ssChaos.Spec.PressureCfg.DistSQLs)
+                       steadyExec.Active = false
+                       execCtx, cancel := context.WithCancel(ctx)
+                       execCtrl := ExecCtrl{
+                               cancel:   cancel,
+                               pressure: steadyExec,
+                               ctx:      execCtx,
+                       }
+                       reconciler.ExecCtrls = append(reconciler.ExecCtrls, 
&execCtrl)
+
+                       for i := 0; i < 10; i++ {
+                               _, err := reconciler.Reconcile(ctx, 
ctrl.Request{NamespacedName: testNamespacedName})
+                               Expect(err).To(BeNil())
+                       }
+                       Expect(len(reconciler.ExecCtrls)).To(Equal(2))
+
+                       var inChaosChaos v1alpha1.ShardingSphereChaos
+                       Expect(fakeClient.Get(ctx, testNamespacedName, 
&inChaosChaos)).Should(Succeed())
+                       
Expect(inChaosChaos.Status.Phase).To(Equal(v1alpha1.BeforeChaos))
+               })
+       })
+})
diff --git a/shardingsphere-operator/pkg/kubernetes/chaosmesh/builder.go 
b/shardingsphere-operator/pkg/kubernetes/chaosmesh/builder.go
index 16ad0fb..3cc44fc 100644
--- a/shardingsphere-operator/pkg/kubernetes/chaosmesh/builder.go
+++ b/shardingsphere-operator/pkg/kubernetes/chaosmesh/builder.go
@@ -24,7 +24,7 @@ import (
 
        
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
 
-       chaosv1alpha1 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
+       chaosmeshv1alpha1 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
        corev1 "k8s.io/api/core/v1"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 )
@@ -57,20 +57,20 @@ var (
 type GenericChaos interface{}
 
 func ConvertChaosStatus(ctx context.Context, ssChaos 
*v1alpha1.ShardingSphereChaos, chaos GenericChaos) v1alpha1.ChaosCondition {
-       var status chaosv1alpha1.ChaosStatus
+       var status chaosmeshv1alpha1.ChaosStatus
        if ssChaos.Spec.EmbedChaos.PodChaos != nil {
-               if podChao, ok := chaos.(*chaosv1alpha1.PodChaos); ok && 
podChao != nil {
+               if podChao, ok := chaos.(*chaosmeshv1alpha1.PodChaos); ok && 
podChao != nil {
                        status = *podChao.GetStatus()
                } else {
                        return v1alpha1.Unknown
                }
        } else if ssChaos.Spec.EmbedChaos.NetworkChaos != nil {
-               if networkChaos, ok := chaos.(*chaosv1alpha1.NetworkChaos); ok 
&& networkChaos != nil {
+               if networkChaos, ok := chaos.(*chaosmeshv1alpha1.NetworkChaos); 
ok && networkChaos != nil {
                        status = *networkChaos.GetStatus()
                }
                return v1alpha1.Unknown
        }
-       var conditions = map[chaosv1alpha1.ChaosConditionType]bool{}
+       var conditions = map[chaosmeshv1alpha1.ChaosConditionType]bool{}
        for i := range status.Conditions {
                conditions[status.Conditions[i].Type] = 
status.Conditions[i].Status == corev1.ConditionTrue
        }
@@ -78,21 +78,21 @@ func ConvertChaosStatus(ctx context.Context, ssChaos 
*v1alpha1.ShardingSphereCha
        return judgeCondition(conditions, status.Experiment.DesiredPhase)
 }
 
-func judgeCondition(condition map[chaosv1alpha1.ChaosConditionType]bool, phase 
chaosv1alpha1.DesiredPhase) v1alpha1.ChaosCondition {
+func judgeCondition(condition map[chaosmeshv1alpha1.ChaosConditionType]bool, 
phase chaosmeshv1alpha1.DesiredPhase) v1alpha1.ChaosCondition {
 
-       if condition[chaosv1alpha1.ConditionPaused] {
-               if !condition[chaosv1alpha1.ConditionSelected] {
+       if condition[chaosmeshv1alpha1.ConditionPaused] {
+               if !condition[chaosmeshv1alpha1.ConditionSelected] {
                        return v1alpha1.NoTarget
                }
                return v1alpha1.Paused
        }
 
-       if condition[chaosv1alpha1.ConditionSelected] {
-               if condition[chaosv1alpha1.ConditionAllRecovered] && phase == 
chaosv1alpha1.StoppedPhase {
+       if condition[chaosmeshv1alpha1.ConditionSelected] {
+               if condition[chaosmeshv1alpha1.ConditionAllRecovered] && phase 
== chaosmeshv1alpha1.StoppedPhase {
                        return v1alpha1.AllRecovered
                }
 
-               if condition[chaosv1alpha1.ConditionAllInjected] && phase == 
chaosv1alpha1.RunningPhase {
+               if condition[chaosmeshv1alpha1.ConditionAllInjected] && phase 
== chaosmeshv1alpha1.RunningPhase {
                        return v1alpha1.AllInjected
                }
        }
@@ -107,7 +107,7 @@ func NewPodChaos(ssChao *v1alpha1.ShardingSphereChaos) 
(PodChaos, error) {
        chao := ssChao.Spec.PodChaos
        if act, ok := ssChao.Annotations[podAction]; ok {
                pcb.SetAction(act)
-               if gp, ok := ssChao.Annotations[gracePeriod]; 
chaosv1alpha1.PodChaosAction(act) == chaosv1alpha1.PodKillAction && ok {
+               if gp, ok := ssChao.Annotations[gracePeriod]; 
chaosmeshv1alpha1.PodChaosAction(act) == chaosmeshv1alpha1.PodKillAction && ok {
                        gpInt, err := strconv.ParseInt(gp, 10, 64)
                        if err != nil {
                                return nil, err
@@ -130,7 +130,7 @@ func NewPodChaos(ssChao *v1alpha1.ShardingSphereChaos) 
(PodChaos, error) {
 
        psb.SetSelectMode(ssChao.Annotations[podSelectorMode]).
                SetValue(ssChao.Annotations[podSelectorValue])
-       containerSelector := &chaosv1alpha1.ContainerSelector{
+       containerSelector := &chaosmeshv1alpha1.ContainerSelector{
                PodSelector: *psb.Build(),
        }
 
@@ -194,34 +194,34 @@ func NewNetworkChaos(ssChao 
*v1alpha1.ShardingSphereChaos) (NetworkChaos, error)
        ncb.SetDevice(ssChao.Annotations[device]).
                SetTargetDevice(ssChao.Annotations[targetDevice])
 
-       tcParams := &chaosv1alpha1.TcParameter{}
+       tcParams := &chaosmeshv1alpha1.TcParameter{}
 
        if chao.Action == v1alpha1.Delay {
-               tcParams.Delay = &chaosv1alpha1.DelaySpec{
+               tcParams.Delay = &chaosmeshv1alpha1.DelaySpec{
                        Latency: chao.Params.Delay.Latency,
                        Jitter:  chao.Params.Delay.Jitter,
                }
        }
 
        if chao.Action == v1alpha1.Corruption {
-               tcParams.Corrupt = &chaosv1alpha1.CorruptSpec{
+               tcParams.Corrupt = &chaosmeshv1alpha1.CorruptSpec{
                        Corrupt: chao.Params.Corruption.Corruption,
                }
        }
 
        if chao.Action == v1alpha1.Duplication {
-               tcParams.Duplicate = &chaosv1alpha1.DuplicateSpec{
+               tcParams.Duplicate = &chaosmeshv1alpha1.DuplicateSpec{
                        Duplicate: chao.Params.Duplication.Duplication,
                }
        }
 
        if chao.Action == v1alpha1.Loss {
-               tcParams.Loss = &chaosv1alpha1.LossSpec{
+               tcParams.Loss = &chaosmeshv1alpha1.LossSpec{
                        Loss: chao.Params.Loss.Loss,
                }
        }
 
-       if chaosv1alpha1.NetworkChaosAction(act) == 
chaosv1alpha1.BandwidthAction {
+       if chaosmeshv1alpha1.NetworkChaosAction(act) == 
chaosmeshv1alpha1.BandwidthAction {
                bwab := NewBandWidthActionBuilder()
                if ind1, ok := ssChao.Annotations[rate]; ok {
                        bwab.SetRate(ind1)
@@ -253,11 +253,11 @@ type PodChaosBuilder interface {
        SetName(string) PodChaosBuilder
        SetLabels(map[string]string) PodChaosBuilder
        SetAnnotations(map[string]string) PodChaosBuilder
-       SetContainerSelector(*chaosv1alpha1.ContainerSelector) PodChaosBuilder
+       SetContainerSelector(*chaosmeshv1alpha1.ContainerSelector) 
PodChaosBuilder
        SetAction(string) PodChaosBuilder
        SetDuration(*string) PodChaosBuilder
        SetGracePeriod(int64) PodChaosBuilder
-       Build() *chaosv1alpha1.PodChaos
+       Build() *chaosmeshv1alpha1.PodChaos
 }
 
 func NewPodChaosBuilder() PodChaosBuilder {
@@ -267,7 +267,7 @@ func NewPodChaosBuilder() PodChaosBuilder {
 }
 
 type podChaosBuilder struct {
-       podChaos *chaosv1alpha1.PodChaos
+       podChaos *chaosmeshv1alpha1.PodChaos
 }
 
 type BandWidthActionBuilder interface {
@@ -276,17 +276,17 @@ type BandWidthActionBuilder interface {
        SetBuffer(string) BandWidthActionBuilder
        SetPeakRate(string) BandWidthActionBuilder
        SetMinBurst(string) BandWidthActionBuilder
-       Build() *chaosv1alpha1.BandwidthSpec
+       Build() *chaosmeshv1alpha1.BandwidthSpec
 }
 
 func NewBandWidthActionBuilder() BandWidthActionBuilder {
        return &bandWidthActionBuilder{
-               bandwidth: &chaosv1alpha1.BandwidthSpec{},
+               bandwidth: &chaosmeshv1alpha1.BandwidthSpec{},
        }
 }
 
 type bandWidthActionBuilder struct {
-       bandwidth *chaosv1alpha1.BandwidthSpec
+       bandwidth *chaosmeshv1alpha1.BandwidthSpec
 }
 
 func (b *bandWidthActionBuilder) SetRate(s string) BandWidthActionBuilder {
@@ -328,7 +328,7 @@ func (b *bandWidthActionBuilder) SetMinBurst(s string) 
BandWidthActionBuilder {
        return b
 }
 
-func (b *bandWidthActionBuilder) Build() *chaosv1alpha1.BandwidthSpec {
+func (b *bandWidthActionBuilder) Build() *chaosmeshv1alpha1.BandwidthSpec {
        return b.bandwidth
 }
 
@@ -352,22 +352,22 @@ func (p *podChaosBuilder) SetAnnotations(annotations 
map[string]string) PodChaos
        return p
 }
 
-func (p *podChaosBuilder) SetContainerSelector(selector 
*chaosv1alpha1.ContainerSelector) PodChaosBuilder {
+func (p *podChaosBuilder) SetContainerSelector(selector 
*chaosmeshv1alpha1.ContainerSelector) PodChaosBuilder {
        p.podChaos.Spec.ContainerSelector = *selector
        return p
 }
 
 func (p *podChaosBuilder) SetAction(action string) PodChaosBuilder {
        if v1alpha1.PodChaosAction(action) == v1alpha1.PodFailure {
-               p.podChaos.Spec.Action = chaosv1alpha1.PodFailureAction
+               p.podChaos.Spec.Action = chaosmeshv1alpha1.PodFailureAction
        }
 
        if v1alpha1.PodChaosAction(action) == v1alpha1.ContainerKill {
-               p.podChaos.Spec.Action = chaosv1alpha1.ContainerKillAction
+               p.podChaos.Spec.Action = chaosmeshv1alpha1.ContainerKillAction
        }
 
-       if chaosv1alpha1.PodChaosAction(action) == chaosv1alpha1.PodKillAction {
-               p.podChaos.Spec.Action = chaosv1alpha1.PodKillAction
+       if chaosmeshv1alpha1.PodChaosAction(action) == 
chaosmeshv1alpha1.PodKillAction {
+               p.podChaos.Spec.Action = chaosmeshv1alpha1.PodKillAction
        }
        return p
 }
@@ -388,7 +388,7 @@ func (p *podChaosBuilder) SetGracePeriod(gracePeriod int64) 
PodChaosBuilder {
        return p
 }
 
-func (p *podChaosBuilder) Build() *chaosv1alpha1.PodChaos {
+func (p *podChaosBuilder) Build() *chaosmeshv1alpha1.PodChaos {
        return p.podChaos
 }
 
@@ -397,19 +397,19 @@ type NetworkChaosBuilder interface {
        SetName(string) NetworkChaosBuilder
        SetLabels(map[string]string) NetworkChaosBuilder
        SetAnnotations(map[string]string) NetworkChaosBuilder
-       SetPodSelector(*chaosv1alpha1.PodSelector) NetworkChaosBuilder
+       SetPodSelector(*chaosmeshv1alpha1.PodSelector) NetworkChaosBuilder
        SetAction(string) NetworkChaosBuilder
        SetDevice(string) NetworkChaosBuilder
        SetDuration(*string) NetworkChaosBuilder
        SetDirection(string) NetworkChaosBuilder
-       SetTarget(*chaosv1alpha1.PodSelector) NetworkChaosBuilder
+       SetTarget(*chaosmeshv1alpha1.PodSelector) NetworkChaosBuilder
        SetTargetDevice(string) NetworkChaosBuilder
-       SetTcParameter(chaosv1alpha1.TcParameter) NetworkChaosBuilder
-       Build() *chaosv1alpha1.NetworkChaos
+       SetTcParameter(chaosmeshv1alpha1.TcParameter) NetworkChaosBuilder
+       Build() *chaosmeshv1alpha1.NetworkChaos
 }
 
 type netWorkChaosBuilder struct {
-       netWorkChaos *chaosv1alpha1.NetworkChaos
+       netWorkChaos *chaosmeshv1alpha1.NetworkChaos
 }
 
 func (n *netWorkChaosBuilder) SetNamespace(namespace string) 
NetworkChaosBuilder {
@@ -432,31 +432,31 @@ func (n *netWorkChaosBuilder) SetAnnotations(annotations 
map[string]string) Netw
        return n
 }
 
-func (n *netWorkChaosBuilder) SetPodSelector(selector 
*chaosv1alpha1.PodSelector) NetworkChaosBuilder {
+func (n *netWorkChaosBuilder) SetPodSelector(selector 
*chaosmeshv1alpha1.PodSelector) NetworkChaosBuilder {
        n.netWorkChaos.Spec.PodSelector = *selector
        return n
 }
 
 func (n *netWorkChaosBuilder) SetAction(action string) NetworkChaosBuilder {
        //FIXME
-       if chaosv1alpha1.NetworkChaosAction(action) == 
chaosv1alpha1.BandwidthAction {
-               n.netWorkChaos.Spec.Action = chaosv1alpha1.BandwidthAction
+       if chaosmeshv1alpha1.NetworkChaosAction(action) == 
chaosmeshv1alpha1.BandwidthAction {
+               n.netWorkChaos.Spec.Action = chaosmeshv1alpha1.BandwidthAction
        }
 
        if v1alpha1.NetworkChaosAction(action) == v1alpha1.Corruption {
-               n.netWorkChaos.Spec.Action = chaosv1alpha1.CorruptAction
+               n.netWorkChaos.Spec.Action = chaosmeshv1alpha1.CorruptAction
        }
 
        if v1alpha1.NetworkChaosAction(action) == v1alpha1.Partition {
-               n.netWorkChaos.Spec.Action = chaosv1alpha1.PartitionAction
+               n.netWorkChaos.Spec.Action = chaosmeshv1alpha1.PartitionAction
        }
 
        if v1alpha1.NetworkChaosAction(action) == v1alpha1.Loss {
-               n.netWorkChaos.Spec.Action = chaosv1alpha1.LossAction
+               n.netWorkChaos.Spec.Action = chaosmeshv1alpha1.LossAction
        }
 
        if v1alpha1.NetworkChaosAction(action) == v1alpha1.Duplication {
-               n.netWorkChaos.Spec.Action = chaosv1alpha1.DuplicateAction
+               n.netWorkChaos.Spec.Action = chaosmeshv1alpha1.DuplicateAction
        }
 
        return n
@@ -473,11 +473,11 @@ func (n *netWorkChaosBuilder) SetDuration(duration 
*string) NetworkChaosBuilder
 }
 
 func (n *netWorkChaosBuilder) SetDirection(direction string) 
NetworkChaosBuilder {
-       n.netWorkChaos.Spec.Direction = chaosv1alpha1.Direction(direction)
+       n.netWorkChaos.Spec.Direction = chaosmeshv1alpha1.Direction(direction)
        return n
 }
 
-func (n *netWorkChaosBuilder) SetTarget(selector *chaosv1alpha1.PodSelector) 
NetworkChaosBuilder {
+func (n *netWorkChaosBuilder) SetTarget(selector 
*chaosmeshv1alpha1.PodSelector) NetworkChaosBuilder {
        n.netWorkChaos.Spec.Target = selector
        return n
 }
@@ -487,12 +487,12 @@ func (n *netWorkChaosBuilder) 
SetTargetDevice(targetDevice string) NetworkChaosB
        return n
 }
 
-func (n *netWorkChaosBuilder) SetTcParameter(parameter 
chaosv1alpha1.TcParameter) NetworkChaosBuilder {
+func (n *netWorkChaosBuilder) SetTcParameter(parameter 
chaosmeshv1alpha1.TcParameter) NetworkChaosBuilder {
        n.netWorkChaos.Spec.TcParameter = parameter
        return n
 }
 
-func (n *netWorkChaosBuilder) Build() *chaosv1alpha1.NetworkChaos {
+func (n *netWorkChaosBuilder) Build() *chaosmeshv1alpha1.NetworkChaos {
        return n.netWorkChaos
 }
 
@@ -514,17 +514,17 @@ type PodSelectorBuilder interface {
        SetLabelSelector(map[string]string) PodSelectorBuilder
        SetExpressionSelectors([]metav1.LabelSelectorRequirement) 
PodSelectorBuilder
        SetAnnotationSelectors(map[string]string) PodSelectorBuilder
-       Build() *chaosv1alpha1.PodSelector
+       Build() *chaosmeshv1alpha1.PodSelector
 }
 
 func NewPodSelectorBuilder() PodSelectorBuilder {
        return &podSelectorBuilder{
-               podSelector: &chaosv1alpha1.PodSelector{},
+               podSelector: &chaosmeshv1alpha1.PodSelector{},
        }
 }
 
 type podSelectorBuilder struct {
-       podSelector *chaosv1alpha1.PodSelector
+       podSelector *chaosmeshv1alpha1.PodSelector
 }
 
 func (p *podSelectorBuilder) SetNamespaces(namespaces []string) 
PodSelectorBuilder {
@@ -533,7 +533,7 @@ func (p *podSelectorBuilder) SetNamespaces(namespaces 
[]string) PodSelectorBuild
 }
 
 func (p *podSelectorBuilder) SetSelectMode(mode string) PodSelectorBuilder {
-       p.podSelector.Mode = chaosv1alpha1.SelectorMode(mode)
+       p.podSelector.Mode = chaosmeshv1alpha1.SelectorMode(mode)
        return p
 }
 
@@ -582,32 +582,32 @@ func (p *podSelectorBuilder) 
SetAnnotationSelectors(annotationSelectors map[stri
        return p
 }
 
-func (p *podSelectorBuilder) Build() *chaosv1alpha1.PodSelector {
+func (p *podSelectorBuilder) Build() *chaosmeshv1alpha1.PodSelector {
        return p.podSelector
 }
 
-func DefaultPodChaos() *chaosv1alpha1.PodChaos {
-       return &chaosv1alpha1.PodChaos{
+func DefaultPodChaos() *chaosmeshv1alpha1.PodChaos {
+       return &chaosmeshv1alpha1.PodChaos{
                ObjectMeta: metav1.ObjectMeta{
                        Name:      "shardingsphere-proxy",
                        Namespace: "default",
                        Labels:    map[string]string{},
                },
-               Spec: chaosv1alpha1.PodChaosSpec{
-                       Action: chaosv1alpha1.ContainerKillAction,
+               Spec: chaosmeshv1alpha1.PodChaosSpec{
+                       Action: chaosmeshv1alpha1.ContainerKillAction,
                },
        }
 }
 
-func DefaultNetworkChaos() *chaosv1alpha1.NetworkChaos {
-       return &chaosv1alpha1.NetworkChaos{
+func DefaultNetworkChaos() *chaosmeshv1alpha1.NetworkChaos {
+       return &chaosmeshv1alpha1.NetworkChaos{
                ObjectMeta: metav1.ObjectMeta{
                        Name:      "shardingsphere-proxy",
                        Namespace: "default",
                        Labels:    map[string]string{},
                },
-               Spec: chaosv1alpha1.NetworkChaosSpec{
-                       Action:    chaosv1alpha1.PartitionAction,
+               Spec: chaosmeshv1alpha1.NetworkChaosSpec{
+                       Action:    chaosmeshv1alpha1.PartitionAction,
                        Direction: "to",
                },
        }
diff --git a/shardingsphere-operator/pkg/kubernetes/chaosmesh/chaosmesh.go 
b/shardingsphere-operator/pkg/kubernetes/chaosmesh/chaosmesh.go
index e54cb52..9e1d3bc 100644
--- a/shardingsphere-operator/pkg/kubernetes/chaosmesh/chaosmesh.go
+++ b/shardingsphere-operator/pkg/kubernetes/chaosmesh/chaosmesh.go
@@ -23,7 +23,7 @@ import (
 
        
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
 
-       chaosmeshapi "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
+       chaosmeshv1alpha1 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
        apierrors "k8s.io/apimachinery/pkg/api/errors"
        "k8s.io/apimachinery/pkg/types"
        "sigs.k8s.io/controller-runtime/pkg/client"
@@ -85,7 +85,7 @@ type getter struct {
 type PodChaos interface{}
 
 func (cg getter) GetPodChaosByNamespacedName(ctx context.Context, 
namespacedName types.NamespacedName) (PodChaos, error) {
-       chaos := &chaosmeshapi.PodChaos{}
+       chaos := &chaosmeshv1alpha1.PodChaos{}
        if err := cg.Get(ctx, namespacedName, chaos); err != nil {
                if apierrors.IsNotFound(err) {
                        return nil, nil
@@ -99,7 +99,7 @@ func (cg getter) GetPodChaosByNamespacedName(ctx 
context.Context, namespacedName
 type NetworkChaos interface{}
 
 func (cg getter) GetNetworkChaosByNamespacedName(ctx context.Context, 
namespacedName types.NamespacedName) (NetworkChaos, error) {
-       chaos := &chaosmeshapi.NetworkChaos{}
+       chaos := &chaosmeshv1alpha1.NetworkChaos{}
        if err := cg.Get(ctx, namespacedName, chaos); err != nil {
                if apierrors.IsNotFound(err) {
                        return nil, nil
@@ -132,7 +132,7 @@ func (cs setter) CreatePodChaos(ctx context.Context, 
sschaos *v1alpha1.ShardingS
        if err != nil {
                return err
        }
-       return cs.Client.Create(ctx, pc.(*chaosmeshapi.PodChaos))
+       return cs.Client.Create(ctx, pc.(*chaosmeshv1alpha1.PodChaos))
 }
 
 // UpdatePodChaos updates a pod chaos
@@ -141,11 +141,11 @@ func (cs setter) UpdatePodChaos(ctx context.Context, 
podChaos PodChaos, sschaos
        if err != nil {
                return err
        }
-       s, ok := pc.(*chaosmeshapi.PodChaos)
+       s, ok := pc.(*chaosmeshv1alpha1.PodChaos)
        if !ok {
                return ErrConvert
        }
-       t, ok := podChaos.(*chaosmeshapi.PodChaos)
+       t, ok := podChaos.(*chaosmeshv1alpha1.PodChaos)
        if !ok {
                return ErrConvert
        }
@@ -159,7 +159,7 @@ func (cs setter) UpdatePodChaos(ctx context.Context, 
podChaos PodChaos, sschaos
 
 // DeletePodChaos deletes a pod chaos
 func (cs setter) DeletePodChaos(ctx context.Context, chao PodChaos) error {
-       podChao, ok := chao.(*chaosmeshapi.PodChaos)
+       podChao, ok := chao.(*chaosmeshv1alpha1.PodChaos)
        if !ok {
                return ErrConvert
        }
@@ -176,7 +176,7 @@ func (cs setter) CreateNetworkChaos(ctx context.Context, 
sschaos *v1alpha1.Shard
        if err != nil {
                return err
        }
-       return cs.Client.Create(ctx, nc.(*chaosmeshapi.NetworkChaos))
+       return cs.Client.Create(ctx, nc.(*chaosmeshv1alpha1.NetworkChaos))
 }
 
 // UpdateNetworkChaos updates a network chaos
@@ -185,11 +185,11 @@ func (cs setter) UpdateNetworkChaos(ctx context.Context, 
networkChaos NetworkCha
        if err != nil {
                return err
        }
-       s, ok := pc.(*chaosmeshapi.NetworkChaos)
+       s, ok := pc.(*chaosmeshv1alpha1.NetworkChaos)
        if !ok {
                return ErrConvert
        }
-       t, ok := networkChaos.(*chaosmeshapi.NetworkChaos)
+       t, ok := networkChaos.(*chaosmeshv1alpha1.NetworkChaos)
        if !ok {
                return ErrConvert
        }
@@ -202,7 +202,7 @@ func (cs setter) UpdateNetworkChaos(ctx context.Context, 
networkChaos NetworkCha
 }
 
 func (cs setter) DeleteNetworkChaos(ctx context.Context, chao NetworkChaos) 
error {
-       networkChaos, ok := chao.(*chaosmeshapi.NetworkChaos)
+       networkChaos, ok := chao.(*chaosmeshv1alpha1.NetworkChaos)
        if !ok {
                return ErrConvert
        }
diff --git a/shardingsphere-operator/pkg/kubernetes/chaosmesh/mocks/store.go 
b/shardingsphere-operator/pkg/kubernetes/chaosmesh/mocks/store.go
index 9caffb4..1cb3780 100644
--- a/shardingsphere-operator/pkg/kubernetes/chaosmesh/mocks/store.go
+++ b/shardingsphere-operator/pkg/kubernetes/chaosmesh/mocks/store.go
@@ -10,6 +10,7 @@ import (
 
        v1alpha1 
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
        chaosmesh 
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/chaosmesh"
+
        gomock "github.com/golang/mock/gomock"
        types "k8s.io/apimachinery/pkg/types"
 )
diff --git a/shardingsphere-operator/pkg/kubernetes/configmap/builders.go 
b/shardingsphere-operator/pkg/kubernetes/configmap/builders.go
index 4c68f14..db0ab46 100644
--- a/shardingsphere-operator/pkg/kubernetes/configmap/builders.go
+++ b/shardingsphere-operator/pkg/kubernetes/configmap/builders.go
@@ -21,6 +21,7 @@ import (
        "reflect"
 
        
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
+
        "gopkg.in/yaml.v2"
        corev1 "k8s.io/api/core/v1"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
diff --git a/shardingsphere-operator/pkg/pressure/pressure.go 
b/shardingsphere-operator/pkg/pressure/pressure.go
index 061bc3b..49cfb14 100644
--- a/shardingsphere-operator/pkg/pressure/pressure.go
+++ b/shardingsphere-operator/pkg/pressure/pressure.go
@@ -24,8 +24,6 @@ import (
        "sync"
        "time"
 
-       "github.com/database-mesh/golang-sdk/pkg/random"
-
        
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
        _ "github.com/go-sql-driver/mysql"
 )
@@ -41,8 +39,7 @@ type Pressure struct {
 }
 
 var (
-       db       *sql.DB
-       totalReq int
+       db *sql.DB
 )
 
 type Result struct {
@@ -68,9 +65,9 @@ func NewPressure(name string, tasks []v1alpha1.DistSQL) 
*Pressure {
        }
 }
 
-// todo: get conn args by labels over string
 func initDB(connArgs string) error {
        var err error
+
        db, err = sql.Open("mysql", connArgs)
        if err != nil {
                return err
@@ -84,21 +81,18 @@ func initDB(connArgs string) error {
 
 func (p *Pressure) Run(ctx context.Context, pressureCfg *v1alpha1.PressureCfg) 
{
        p.Active = true
-       totalReq = 0
+       //when all task finished,update active
+       defer func() {
+               p.Active = false
+       }()
 
-       //judge nil for simplify test
-       if db == nil {
-               if err := initDB(pressureCfg.SsHost); err != nil {
-                       p.Err = err
-                       return
-               }
-               defer func() {
-                       if err := db.Close(); err != nil {
-                               p.Err = err
-                       }
-               }()
+       if err := initDB(pressureCfg.SsHost); err != nil {
+               p.Err = err
+               return
        }
 
+       defer db.Close()
+
        result := &p.Result
        pressureCtx, cancel := context.WithTimeout(context.Background(), 
pressureCfg.Duration.Duration)
        defer cancel()
@@ -106,7 +100,7 @@ func (p *Pressure) Run(ctx context.Context, pressureCfg 
*v1alpha1.PressureCfg) {
        resCh := make(chan bool, 1000)
 
        //handle result
-       go p.handleResponse(pressureCtx, resCh, result)
+       go p.handleResponse(resCh, result)
 
        //statistics the running time
        start := time.Now()
@@ -119,7 +113,6 @@ FOR:
                        break FOR
                case <-ticker.C:
                        for i := 0; i < pressureCfg.ConcurrentNum; i++ {
-                               totalReq += pressureCfg.ReqNum
                                //todo: handle err
 
                                //put wg here to prevent: when root ctx is 
closed,but some exec task do not start yet
@@ -139,9 +132,6 @@ FOR:
 
        //wait collect results channel finished
        <-p.finishSignalCh
-
-       //when all task finished,update active
-       p.Active = false
 }
 
 func (p *Pressure) exec(ctx context.Context, times int, res chan bool) {
@@ -151,30 +141,21 @@ func (p *Pressure) exec(ctx context.Context, times int, 
res chan bool) {
                case <-ctx.Done():
                        return
                default:
-               }
-               if len(p.Tasks) == 0 {
-                       return
-               }
-               for i := range p.Tasks {
-                       //generate diff sql, put result into channel
-                       args := randomArgs(p.Tasks[i].Args)
-                       _, err := db.Exec(p.Tasks[i].SQL, args)
-                       res <- err == nil
+                       if len(p.Tasks) == 0 {
+                               return
+                       }
+                       for i := range p.Tasks {
+                               //generate diff sql, put result into channel
+                               args := randomArgs(p.Tasks[i].Args)
+                               _, err := db.Exec(p.Tasks[i].SQL, args...)
+
+                               res <- err == nil
+                       }
                }
        }
 }
 
-func (p *Pressure) handleResponse(ctx context.Context, resCh chan bool, result 
*Result) {
-For:
-       for {
-               select {
-               case <-ctx.Done():
-                       break For
-               case ret := <-resCh:
-                       //todo: add more msg
-                       handle(ret, result)
-               }
-       }
+func (p *Pressure) handleResponse(resCh chan bool, result *Result) {
 
        //get left handleResponse
        for ret := range resCh {
@@ -191,12 +172,13 @@ func handle(ret bool, result *Result) {
                result.Success++
        }
        result.Total++
+
 }
 
-func randomArgs(args []string) []string {
-       var ret []string
+func randomArgs(args []string) []any {
+       var ret []any
        for i := range args {
-               randomArg := fmt.Sprintf("%s-%s", args[i], random.StringN(4))
+               randomArg := fmt.Sprintf("%s-%d", args[i], 
time.Now().UnixNano())
                ret = append(ret, randomArg)
        }
        return ret
diff --git a/shardingsphere-operator/pkg/reconcile/shardingspherechaos/job.go 
b/shardingsphere-operator/pkg/reconcile/shardingspherechaos/job.go
index 855e0fe..dc6663a 100644
--- a/shardingsphere-operator/pkg/reconcile/shardingspherechaos/job.go
+++ b/shardingsphere-operator/pkg/reconcile/shardingspherechaos/job.go
@@ -24,6 +24,7 @@ import (
 
        
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
        
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/reconcile/common"
+
        v1 "k8s.io/api/batch/v1"
        corev1 "k8s.io/api/core/v1"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
diff --git a/shardingsphere-operator/test/e2e/e2e_suite_test.go 
b/shardingsphere-operator/test/e2e/e2e_suite_test.go
index bd7566b..576af0b 100644
--- a/shardingsphere-operator/test/e2e/e2e_suite_test.go
+++ b/shardingsphere-operator/test/e2e/e2e_suite_test.go
@@ -19,6 +19,9 @@ package e2e
 
 import (
        "context"
+       mockChaos 
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/chaosmesh/mocks"
+       "github.com/golang/mock/gomock"
+       clientgoscheme "k8s.io/client-go/kubernetes/scheme"
        "os/exec"
        "path/filepath"
        "testing"
@@ -51,6 +54,7 @@ var (
        ctx       context.Context
        cancel    context.CancelFunc
        err       error
+       mockchaos *mockChaos.MockChaos
 )
 
 func TestControllers(t *testing.T) {
@@ -96,6 +100,7 @@ var _ = BeforeSuite(func() {
 
        Expect(v1alpha1.AddToScheme(scheme.Scheme)).NotTo(HaveOccurred())
        Expect(dbmeshv1alpha1.AddToScheme(scheme.Scheme)).NotTo(HaveOccurred())
+       Expect(clientgoscheme.AddToScheme(scheme.Scheme)).NotTo(HaveOccurred())
        //+kubebuilder:scaffold:scheme
 
        k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme})
@@ -125,21 +130,19 @@ var _ = BeforeSuite(func() {
        }).SetupWithManager(k8sManager)
        Expect(err).ToNot(HaveOccurred())
 
-       /*
-               ctl := gomock.NewController(GinkgoT())
-               clientset, err := clientset.NewForConfig(k8sManager.GetConfig())
-               err = (&controllers.ShardingSphereChaosReconciler{
-                       Client:    k8sManager.GetClient(),
-                       Scheme:    k8sManager.GetScheme(),
-                       Log:       logf.Log,
-                       Chaos:     mockChaos.NewMockChaos(ctl),
-                       Job:       job.NewJob(k8sManager.GetClient()),
-                       ConfigMap: 
configmap.NewConfigMapClient(k8sManager.GetClient()),
-                       Events:    
k8sManager.GetEventRecorderFor("shardingsphere-chaos-controller"),
-                       ClientSet: clientset,
-               }).SetupWithManager(k8sManager)
-               Expect(err).ToNot(HaveOccurred())
-       */
+       ctl := gomock.NewController(GinkgoT())
+       mockchaos = mockChaos.NewMockChaos(ctl)
+
+       err = (&controllers.ShardingSphereChaosReconciler{
+               Client:    k8sManager.GetClient(),
+               Scheme:    k8sManager.GetScheme(),
+               Log:       logf.Log,
+               Events:    
k8sManager.GetEventRecorderFor("shardingsphere-chaos-controller"),
+               Chaos:     mockchaos,
+               ExecCtrls: make([]*controllers.ExecCtrl, 0),
+               ConfigMap: configmap.NewConfigMapClient(k8sManager.GetClient()),
+       }).SetupWithManager(k8sManager)
+       Expect(err).ToNot(HaveOccurred())
 
        go func() {
                defer GinkgoRecover()
diff --git 
a/shardingsphere-operator/test/e2e/shardingsphere_chaos_controller_test.go 
b/shardingsphere-operator/test/e2e/shardingsphere_chaos_controller_test.go
index 77d3a99..2c30a15 100644
--- a/shardingsphere-operator/test/e2e/shardingsphere_chaos_controller_test.go
+++ b/shardingsphere-operator/test/e2e/shardingsphere_chaos_controller_test.go
@@ -17,77 +17,145 @@
 
 package e2e
 
-/*
 import (
-       "fmt"
-       "math/rand"
+       "database/sql"
+       mockChaos 
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/chaosmesh/mocks"
+       "github.com/golang/mock/gomock"
+       "regexp"
        "time"
 
+       "bou.ke/monkey"
+       "github.com/DATA-DOG/go-sqlmock"
        
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
        . "github.com/onsi/ginkgo/v2"
        . "github.com/onsi/gomega"
-       corev1 "k8s.io/api/core/v1"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
        "k8s.io/apimachinery/pkg/types"
 )
 
+func mockchaosStub(chaos *mockChaos.MockChaos) {
+       chaos.EXPECT().NewNetworkChaos(gomock.Any(), 
gomock.Any()).Return(gomock.Any()).AnyTimes()
+       chaos.EXPECT().NewPodChaos(gomock.Any(), 
gomock.Any()).Return(gomock.Any()).AnyTimes()
+
+       chaos.EXPECT().CreatePodChaos(gomock.Any(), 
gomock.Any()).Return(nil).AnyTimes()
+       chaos.EXPECT().CreateNetworkChaos(gomock.Any(), 
gomock.Any()).Return(nil).AnyTimes()
+
+       chaos.EXPECT().DeleteNetworkChaos(gomock.Any(), 
gomock.Any()).Return(nil).AnyTimes()
+       chaos.EXPECT().DeletePodChaos(gomock.Any(), 
gomock.Any()).Return(nil).AnyTimes()
+
+       chaos.EXPECT().UpdatePodChaos(gomock.Any(), gomock.Any(), 
gomock.Any()).Return(nil).AnyTimes()
+       chaos.EXPECT().UpdateNetworkChaos(gomock.Any(), gomock.Any(), 
gomock.Any()).Return(nil).AnyTimes()
+
+       chaos.EXPECT().GetNetworkChaosByNamespacedName(gomock.Any(), 
gomock.Any()).Return(gomock.Any(), nil).AnyTimes()
+       chaos.EXPECT().GetPodChaosByNamespacedName(gomock.Any(), 
gomock.Any()).Return(gomock.Any(), nil).AnyTimes()
+}
+
+func mockDBStub(mock sqlmock.Sqlmock) {
+       mock.ExpectExec(regexp.QuoteMeta("REGISTER STORAGE 
UNIT")).WillReturnResult(sqlmock.NewResult(1, 1))
+}
+
+type compare struct {
+       phase           v1alpha1.ChaosPhase
+       conditionStatus []metav1.ConditionStatus
+}
+
 var _ = Describe("ShardingSphereChaos", func() {
-       var d = "5m"
+       var (
+               testNamespacedName = types.NamespacedName{
+                       Namespace: "default",
+                       Name:      "testsschaos",
+               }
+               duration = "5s"
+               db       *sql.DB
+       )
 
-       Context("check related resource created by ShardingSphereChaos 
Controller", func() {
+       BeforeEach(func() {
                var (
-                       ssChaos   *v1alpha1.ShardingSphereChaos
-                       name      = fmt.Sprintf("%s-%d", "test.sschaos-", 
rand.Int31())
-                       namespace = "default"
+                       dbmock sqlmock.Sqlmock
+                       err    error
                )
-               BeforeEach(func() {
-                       ssChaos = &v1alpha1.ShardingSphereChaos{
+
+               db, dbmock, err = sqlmock.New()
+               Expect(err).To(BeNil())
+               Expect(db).NotTo(BeNil())
+               Expect(dbmock).NotTo(BeNil())
+
+               monkey.Patch(sql.Open, func(driverName, dataSourceName string) 
(*sql.DB, error) {
+                       return db, nil
+               })
+               mockchaosStub(mockchaos)
+               mockDBStub(dbmock)
+       })
+
+       AfterEach(func() {
+               monkey.UnpatchAll()
+               db.Close()
+       })
+
+       Context("reconcile ShardingSphereChaos", func() {
+               var desireStatus = compare{
+                       phase:           v1alpha1.AfterChaos,
+                       conditionStatus: 
[]metav1.ConditionStatus{metav1.ConditionTrue, metav1.ConditionTrue, 
metav1.ConditionTrue, metav1.ConditionTrue},
+               }
+
+               It("should create successfully", func() {
+                       ssChaos := &v1alpha1.ShardingSphereChaos{
                                ObjectMeta: metav1.ObjectMeta{
-                                       Name:      name,
-                                       Namespace: namespace,
-                                       Labels: map[string]string{
-                                               "app": "shardingsphere-proxy",
-                                       },
-                                       Annotations: map[string]string{
-                                               "spec/mode": "all",
-                                       },
+                                       Name:      testNamespacedName.Name,
+                                       Namespace: testNamespacedName.Namespace,
                                },
                                Spec: v1alpha1.ShardingSphereChaosSpec{
                                        EmbedChaos: v1alpha1.EmbedChaos{
                                                PodChaos: 
&v1alpha1.PodChaosSpec{
                                                        PodSelector: 
v1alpha1.PodSelector{
-                                                               Namespaces: 
[]string{"mesh-test"},
                                                                LabelSelectors: 
map[string]string{
-                                                                       
"app.kubernetes.io/component": "zookeeper-new",
+                                                                       
"app.kubernetes.io/component": "zookeeper",
                                                                },
                                                        },
                                                        Action: 
v1alpha1.PodFailure,
                                                        Params: 
v1alpha1.PodChaosParams{
                                                                PodFailure: 
&v1alpha1.PodFailureParams{
-                                                                       
Duration: &d,
+                                                                       
Duration: &duration,
                                                                },
                                                        },
                                                },
                                        },
+                                       PressureCfg: v1alpha1.PressureCfg{
+                                               SsHost:   "127.0.0.1:3306/ds_1",
+                                               Duration: 
metav1.Duration{Duration: 10 * time.Second},
+                                               ReqTime:  
metav1.Duration{Duration: 5 * time.Second},
+                                               DistSQLs: []v1alpha1.DistSQL{
+                                                       {
+                                                               SQL:  "REGISTER 
STORAGE UNIT ?()",
+                                                               Args: 
[]string{"ds_1"},
+                                                       },
+                                               },
+                                               ConcurrentNum: 2,
+                                               ReqNum:        5,
+                                       },
                                },
+                               Status: v1alpha1.ShardingSphereChaosStatus{},
                        }
-                       Expect(k8sClient.Create(ctx, ssChaos)).To(BeNil())
-               })
 
-               AfterEach(func() {
-                       Expect(k8sClient.Delete(ctx, ssChaos)).To(BeNil())
-               })
+                       Expect(k8sClient.Create(ctx, ssChaos)).Should(Succeed())
+
+                       Eventually(func() compare {
+                               var chaos v1alpha1.ShardingSphereChaos
+                               Expect(k8sClient.Get(ctx, testNamespacedName, 
&chaos)).Should(Succeed())
+                               now := compare{
+                                       phase:           chaos.Status.Phase,
+                                       conditionStatus: 
make([]metav1.ConditionStatus, 0),
+                               }
+                               for i := range chaos.Status.Conditions {
+                                       now.conditionStatus = 
append(now.conditionStatus, chaos.Status.Conditions[i].Status)
+                               }
+
+                               return now
+                       }, 25*time.Second, 
1*time.Second).Should(Equal(desireStatus))
 
-               It("should create configmap", func() {
-                       configmap := &corev1.ConfigMap{}
-                       namespacedName := types.NamespacedName{Name: name, 
Namespace: namespace}
-                       Eventually(func() bool {
-                               err := k8sClient.Get(ctx, namespacedName, 
configmap)
-                               return err == nil
-                       }, time.Second*10, 
time.Millisecond*250).Should(BeTrue())
+                       Expect(k8sClient.Delete(ctx, ssChaos)).Should(Succeed())
                })
 
        })
 
 })
-*/

Reply via email to