This is an automated email from the ASF dual-hosted git repository.

miaoliyao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/shardingsphere-on-cloud.git


The following commit(s) were added to refs/heads/main by this push:
     new 6b4ae4c  feat(operator): add-pressure-exec replace job(get pod log)
     new 8e895b9  Merge pull request #358 from moomman/add-pressure-exec
6b4ae4c is described below

commit 6b4ae4caa25978541cc0314d0919398f9348aa7d
Author: moonman <[email protected]>
AuthorDate: Wed May 10 18:44:58 2023 +0800

    feat(operator): add-pressure-exec replace job(get pod log)
---
 .../api/v1alpha1/shardingsphere_chaos_types.go     |  21 +-
 .../api/v1alpha1/zz_generated.deepcopy.go          |  61 ++-
 .../cmd/shardingsphere-operator/manager/option.go  |  19 +-
 shardingsphere-operator/go.mod                     |   4 +-
 shardingsphere-operator/go.sum                     |   6 +
 .../controllers/shardingsphere_chaos_controller.go | 417 ++++++---------------
 shardingsphere-operator/pkg/pressure/pressure.go   | 203 ++++++++++
 .../pkg/pressure/pressure_test.go                  |  91 +++++
 8 files changed, 483 insertions(+), 339 deletions(-)

diff --git a/shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go 
b/shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go
index e1c5e90..43431e4 100644
--- a/shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go
+++ b/shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go
@@ -42,13 +42,24 @@ type ShardingSphereChaos struct {
 
 // ShardingSphereChaosSpec defines the desired state of ShardingSphereChaos
 type ShardingSphereChaosSpec struct {
-       InjectJob  JobSpec `json:"injectJob,omitempty"`
-       EmbedChaos `json:",inline"`
-       Expect     Expect `json:"expect,omitempty"`
+       InjectJob   JobSpec `json:"injectJob,omitempty"`
+       EmbedChaos  `json:",inline"`
+       PressureCfg PressureCfg `json:"pressureCfg"`
 }
 
-type Expect struct {
-       Verify string `json:"verify,omitempty"`
+type PressureCfg struct {
+       ZkHost        string          `json:"zkHost,omitempty"`
+       SsHost        string          `json:"ssHost"`
+       Duration      metav1.Duration `json:"duration"`
+       ReqTime       metav1.Duration `json:"reqTime"`
+       DistSQLs      []DistSQL       `json:"distSQLs,omitempty"`
+       ConcurrentNum int             `json:"concurrentNum"`
+       ReqNum        int             `json:"reqNum"`
+}
+
+type DistSQL struct {
+       SQL  string   `json:"sql"`
+       Args []string `json:"args"`
 }
 
 type Script string
diff --git a/shardingsphere-operator/api/v1alpha1/zz_generated.deepcopy.go 
b/shardingsphere-operator/api/v1alpha1/zz_generated.deepcopy.go
index 466979d..f2e6b08 100644
--- a/shardingsphere-operator/api/v1alpha1/zz_generated.deepcopy.go
+++ b/shardingsphere-operator/api/v1alpha1/zz_generated.deepcopy.go
@@ -550,6 +550,26 @@ func (in *DelayParams) DeepCopy() *DelayParams {
        return out
 }
 
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, 
writing into out. in must be non-nil.
+func (in *DistSQL) DeepCopyInto(out *DistSQL) {
+       *out = *in
+       if in.Args != nil {
+               in, out := &in.Args, &out.Args
+               *out = make([]string, len(*in))
+               copy(*out, *in)
+       }
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, 
creating a new DistSQL.
+func (in *DistSQL) DeepCopy() *DistSQL {
+       if in == nil {
+               return nil
+       }
+       out := new(DistSQL)
+       in.DeepCopyInto(out)
+       return out
+}
+
 // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, 
writing into out. in must be non-nil.
 func (in *DuplicationParams) DeepCopyInto(out *DuplicationParams) {
        *out = *in
@@ -605,21 +625,6 @@ func (in *Endpoint) DeepCopy() *Endpoint {
        return out
 }
 
-// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, 
writing into out. in must be non-nil.
-func (in *Expect) DeepCopyInto(out *Expect) {
-       *out = *in
-}
-
-// DeepCopy is an autogenerated deepcopy function, copying the receiver, 
creating a new Expect.
-func (in *Expect) DeepCopy() *Expect {
-       if in == nil {
-               return nil
-       }
-       out := new(Expect)
-       in.DeepCopyInto(out)
-       return out
-}
-
 // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, 
writing into out. in must be non-nil.
 func (in *InstanceStatus) DeepCopyInto(out *InstanceStatus) {
        *out = *in
@@ -1047,6 +1052,30 @@ func (in *PortBinding) DeepCopy() *PortBinding {
        return out
 }
 
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, 
writing into out. in must be non-nil.
+func (in *PressureCfg) DeepCopyInto(out *PressureCfg) {
+       *out = *in
+       out.Duration = in.Duration
+       out.ReqTime = in.ReqTime
+       if in.DistSQLs != nil {
+               in, out := &in.DistSQLs, &out.DistSQLs
+               *out = make([]DistSQL, len(*in))
+               for i := range *in {
+                       (*in)[i].DeepCopyInto(&(*out)[i])
+               }
+       }
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, 
creating a new PressureCfg.
+func (in *PressureCfg) DeepCopy() *PressureCfg {
+       if in == nil {
+               return nil
+       }
+       out := new(PressureCfg)
+       in.DeepCopyInto(out)
+       return out
+}
+
 // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, 
writing into out. in must be non-nil.
 func (in *Privilege) DeepCopyInto(out *Privilege) {
        *out = *in
@@ -1436,7 +1465,7 @@ func (in *ShardingSphereChaosSpec) DeepCopyInto(out 
*ShardingSphereChaosSpec) {
        *out = *in
        out.InjectJob = in.InjectJob
        in.EmbedChaos.DeepCopyInto(&out.EmbedChaos)
-       out.Expect = in.Expect
+       in.PressureCfg.DeepCopyInto(&out.PressureCfg)
 }
 
 // DeepCopy is an autogenerated deepcopy function, copying the receiver, 
creating a new ShardingSphereChaosSpec.
diff --git 
a/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go 
b/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go
index ab69d45..9314332 100644
--- a/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go
+++ b/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go
@@ -21,6 +21,8 @@ import (
        "flag"
        "strings"
 
+       
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/pressure"
+
        
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
        
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/controllers"
        sschaos 
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/chaosmesh"
@@ -179,14 +181,15 @@ var featureGatesHandlers = map[string]FeatureGateHandler{
                        return err
                }
                if err := (&controllers.ShardingSphereChaosReconciler{
-                       Client:    mgr.GetClient(),
-                       Scheme:    mgr.GetScheme(),
-                       Log:       mgr.GetLogger(),
-                       Chaos:     sschaos.NewChaos(mgr.GetClient()),
-                       Job:       job.NewJob(mgr.GetClient()),
-                       ConfigMap: 
configmap.NewConfigMapClient(mgr.GetClient()),
-                       Events:    
mgr.GetEventRecorderFor("shardingsphere-chaos-controller"),
-                       ClientSet: clientset,
+                       Client:       mgr.GetClient(),
+                       Scheme:       mgr.GetScheme(),
+                       Log:          mgr.GetLogger(),
+                       Chaos:        sschaos.NewChaos(mgr.GetClient()),
+                       Job:          job.NewJob(mgr.GetClient()),
+                       ExecRecorder: make([]*pressure.Pressure, 0),
+                       ConfigMap:    
configmap.NewConfigMapClient(mgr.GetClient()),
+                       Events:       
mgr.GetEventRecorderFor("shardingsphere-chaos-controller"),
+                       ClientSet:    clientset,
                }).SetupWithManager(mgr); err != nil {
                        logger.Error(err, "unable to create controller", 
"controller", "ShardingSphereChaos")
                        return err
diff --git a/shardingsphere-operator/go.mod b/shardingsphere-operator/go.mod
index d20fc79..621957a 100644
--- a/shardingsphere-operator/go.mod
+++ b/shardingsphere-operator/go.mod
@@ -6,9 +6,9 @@ require (
        bou.ke/monkey v1.0.2
        github.com/antlr/antlr4 v0.0.0-20181218183524-be58ebffde8e
        github.com/chaos-mesh/chaos-mesh/api v0.0.0-20230410023700-25a841a23cd2
-       github.com/golang/mock v1.6.0
        github.com/database-mesh/golang-sdk v0.0.0-20230420101548-53265cd9883a
        github.com/go-logr/logr v1.2.4
+       github.com/golang/mock v1.6.0
        github.com/onsi/ginkgo/v2 v2.8.0
        github.com/onsi/gomega v1.26.0
        github.com/prometheus/client_golang v1.14.0
@@ -25,6 +25,7 @@ require (
 require github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // 
indirect
 
 require (
+       github.com/DATA-DOG/go-sqlmock v1.5.0 // indirect
        github.com/aws/aws-sdk-go-v2 v1.17.5 // indirect
        github.com/aws/aws-sdk-go-v2/config v1.18.4 // indirect
        github.com/aws/aws-sdk-go-v2/credentials v1.13.4 // indirect
@@ -50,6 +51,7 @@ require (
        github.com/go-openapi/jsonpointer v0.19.6 // indirect
        github.com/go-openapi/jsonreference v0.20.2 // indirect
        github.com/go-openapi/swag v0.22.3 // indirect
+       github.com/go-sql-driver/mysql v1.7.0 // indirect
        github.com/gogo/protobuf v1.3.2 // indirect
        github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // 
indirect
        github.com/golang/protobuf v1.5.3 // indirect
diff --git a/shardingsphere-operator/go.sum b/shardingsphere-operator/go.sum
index eee5ba1..863342f 100644
--- a/shardingsphere-operator/go.sum
+++ b/shardingsphere-operator/go.sum
@@ -3,6 +3,8 @@ bou.ke/monkey v1.0.2/go.mod 
h1:OqickVX3tNx6t33n1xvtTtu85YN5s6cKwVug+oHMaIA=
 cloud.google.com/go v0.26.0/go.mod 
h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
 cloud.google.com/go v0.34.0/go.mod 
h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
 github.com/BurntSushi/toml v0.3.1/go.mod 
h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
+github.com/DATA-DOG/go-sqlmock v1.5.0 
h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60=
+github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod 
h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
 github.com/OneOfOne/xxhash v1.2.2/go.mod 
h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
 github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 
h1:s6gZFSlWYmbqAuRjVTiNNhvNRfY2Wxp9nhfyel4rklc=
 github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137/go.mod 
h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE=
@@ -93,6 +95,10 @@ github.com/go-openapi/jsonreference v0.20.2 
h1:3sVjiK66+uXK/6oQ8xgcRKcFgQ5KXa2Kv
 github.com/go-openapi/jsonreference v0.20.2/go.mod 
h1:Bl1zwGIM8/wsvqjsOQLJ/SH+En5Ap4rVB5KVcIDZG2k=
 github.com/go-openapi/swag v0.22.3 
h1:yMBqmnQ0gyZvEb/+KzuWZOXgllrXT4SADYbvDaXHv/g=
 github.com/go-openapi/swag v0.22.3/go.mod 
h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14=
+github.com/go-sql-driver/mysql v1.7.0 
h1:ueSltNNllEqE3qcWBTD0iQd3IpL/6U+mJxLkazJ7YPc=
+github.com/go-sql-driver/mysql v1.7.0/go.mod 
h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
+github.com/go-sql-driver/mysql v1.7.1 
h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI=
+github.com/go-sql-driver/mysql v1.7.1/go.mod 
h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
 github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
 github.com/gogo/protobuf v1.3.2/go.mod 
h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
 github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod 
h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
diff --git 
a/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller.go 
b/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller.go
index 2123747..089abfe 100644
--- a/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller.go
+++ b/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller.go
@@ -21,7 +21,8 @@ import (
        "context"
        "errors"
        "fmt"
-       "time"
+
+       
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/pressure"
 
        "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
 
@@ -37,10 +38,8 @@ import (
        apierrors "k8s.io/apimachinery/pkg/api/errors"
        "k8s.io/apimachinery/pkg/runtime"
        "k8s.io/apimachinery/pkg/types"
-       "k8s.io/apimachinery/pkg/util/wait"
        clientset "k8s.io/client-go/kubernetes"
        "k8s.io/client-go/tools/record"
-       "k8s.io/client-go/util/retry"
        ctrl "sigs.k8s.io/controller-runtime"
        "sigs.k8s.io/controller-runtime/pkg/client"
 )
@@ -70,9 +69,10 @@ type ShardingSphereChaosReconciler struct {
        Events    record.EventRecorder
        ClientSet *clientset.Clientset
 
-       Chaos     sschaos.Chaos
-       Job       job.Job
-       ConfigMap configmap.ConfigMap
+       Chaos        sschaos.Chaos
+       Job          job.Job
+       ExecRecorder []*pressure.Pressure
+       ConfigMap    configmap.ConfigMap
 }
 
 // Reconcile handles main function of this controller
@@ -114,12 +114,10 @@ func (r *ShardingSphereChaosReconciler) Reconcile(ctx 
context.Context, req ctrl.
                r.Events.Event(ssChaos, "Warning", "configmap error", 
err.Error())
        }
 
-       if err := r.reconcileJob(ctx, ssChaos); err != nil {
+       if err := r.reconcilePressure(ctx, ssChaos); err != nil {
                if err != nil {
                        errors = append(errors, err)
                }
-               logger.Error(err, "reconcile job error")
-               r.Events.Event(ssChaos, "Warning", "job error", err.Error())
        }
 
        if err := r.reconcileStatus(ctx, ssChaos); err != nil {
@@ -134,6 +132,106 @@ func (r *ShardingSphereChaosReconciler) Reconcile(ctx 
context.Context, req ctrl.
        return ctrl.Result{RequeueAfter: defaultRequeueTime}, nil
 }
 
+func (r *ShardingSphereChaosReconciler) reconcilePressure(ctx context.Context, 
chao *v1alpha1.ShardingSphereChaos) error {
+       exec := r.getNeedExec(chao)
+
+       //if exec in this phase do not exist,create it
+       if exec == nil {
+               exec := pressure.NewPressure(getExecName(chao), 
chao.Spec.PressureCfg.DistSQLs)
+               go exec.Run(ctx, &chao.Spec.PressureCfg)
+               r.ExecRecorder = append(r.ExecRecorder, exec)
+       }
+
+       return nil
+}
+
+func (r *ShardingSphereChaosReconciler) reconcileStatus(ctx context.Context, 
chaos *v1alpha1.ShardingSphereChaos) error {
+       namespacedName := types.NamespacedName{
+               Name:      chaos.Name,
+               Namespace: chaos.Namespace,
+       }
+
+       setDefaultStatus(chaos)
+       r.updatePhaseExec(chaos)
+
+       if err := r.updateChaosCondition(ctx, chaos); err != nil {
+               return err
+       }
+
+       rt, err := r.getRuntimeChaos(ctx, namespacedName)
+       if err != nil {
+               return err
+       }
+       rt.Status = chaos.Status
+
+       return r.Status().Update(ctx, rt)
+}
+
+func (r *ShardingSphereChaosReconciler) updatePhaseExec(chaos 
*v1alpha1.ShardingSphereChaos) {
+       exec := r.getNeedExec(chaos)
+       if exec == nil || exec.Active {
+               return
+       }
+
+       //todo: judge error
+
+       msg := generateMsgFromExec(exec)
+       //when exec finished, update phase
+       switch chaos.Status.Phase {
+       case v1alpha1.BeforeSteady:
+               chaos.Status.Result.Steady = *msg
+               chaos.Status.Phase = v1alpha1.BeforeChaos
+       case v1alpha1.BeforeChaos:
+               chaos.Status.Result.Chaos = *msg
+               chaos.Status.Phase = v1alpha1.AfterChaos
+       }
+
+}
+
+func generateMsgFromExec(exec *pressure.Pressure) *v1alpha1.Msg {
+       //todo: wait to change result compute way
+       rate := 0
+       if exec.Result.Total == 0 {
+               rate = 0
+       } else {
+               rate = exec.Result.Success / exec.Result.Total
+       }
+       msg := v1alpha1.Msg{
+               Result:   fmt.Sprintf("%d", rate),
+               Duration: exec.Result.Duration.String(),
+       }
+       if exec.Err != nil {
+               msg.FailureDetails = exec.Err.Error()
+       }
+
+       return &msg
+}
+
+func getExecName(chao *v1alpha1.ShardingSphereChaos) string {
+       var execName string
+       if chao.Status.Phase == v1alpha1.BeforeSteady || chao.Status.Phase == 
v1alpha1.AfterSteady {
+               execName = reconcile.MakeJobName(chao.Name, reconcile.InSteady)
+       }
+       if chao.Status.Phase == v1alpha1.BeforeChaos || chao.Status.Phase == 
v1alpha1.AfterChaos {
+               execName = reconcile.MakeJobName(chao.Name, reconcile.InChaos)
+       }
+
+       return execName
+}
+
+func (r *ShardingSphereChaosReconciler) getNeedExec(chao 
*v1alpha1.ShardingSphereChaos) *pressure.Pressure {
+       jobName := getExecName(chao)
+
+       //if pressure do not exist,run it
+       for i := range r.ExecRecorder {
+               if r.ExecRecorder[i].Name == jobName {
+                       return r.ExecRecorder[i]
+               }
+       }
+
+       return nil
+}
+
 func (r *ShardingSphereChaosReconciler) getRuntimeChaos(ctx context.Context, 
name types.NamespacedName) (*v1alpha1.ShardingSphereChaos, error) {
        var rt = &v1alpha1.ShardingSphereChaos{}
        err := r.Get(ctx, name, rt)
@@ -215,7 +313,7 @@ func (r *ShardingSphereChaosReconciler) 
deleteNetworkChaos(ctx context.Context,
 func (r *ShardingSphereChaosReconciler) reconcileChaos(ctx context.Context, 
chaos *v1alpha1.ShardingSphereChaos) error {
        logger := r.Log.WithValues("reconcile shardingspherechaos", 
fmt.Sprintf("%s/%s", chaos.Namespace, chaos.Name))
 
-       if chaos.Status.Phase == v1alpha1.BeforeSteady || chaos.Status.Phase == 
v1alpha1.AfterSteady {
+       if chaos.Status.Phase == "" || chaos.Status.Phase == 
v1alpha1.BeforeSteady || chaos.Status.Phase == v1alpha1.AfterSteady {
                return nil
        }
 
@@ -336,221 +434,12 @@ func (r *ShardingSphereChaosReconciler) 
reconcileConfigMap(ctx context.Context,
        return nil
 }
 
-func (r *ShardingSphereChaosReconciler) reconcileJob(ctx context.Context, 
chaos *v1alpha1.ShardingSphereChaos) error {
-       var jobType reconcile.JobType
-
-       switch chaos.Status.Phase {
-       case v1alpha1.BeforeChaos, v1alpha1.AfterChaos:
-               jobType = reconcile.InChaos
-       case v1alpha1.BeforeSteady, v1alpha1.AfterSteady:
-               jobType = reconcile.InSteady
-       default:
-               jobType = reconcile.InSteady
-       }
-
-       namespaceName := types.NamespacedName{
-               Namespace: chaos.Namespace,
-               Name:      reconcile.MakeJobName(chaos.Name, jobType),
-       }
-
-       job, err := r.getJobByNamespacedName(ctx, namespaceName)
-       if err != nil {
-               return err
-       }
-
-       if job != nil {
-               return r.updateJob(ctx, jobType, chaos, job)
-       }
-
-       return r.createJob(ctx, jobType, chaos)
-}
-
-func (r *ShardingSphereChaosReconciler) reconcileStatus(ctx context.Context, 
chaos *v1alpha1.ShardingSphereChaos) error {
-       namespacedName := types.NamespacedName{
-               Name:      chaos.Name,
-               Namespace: chaos.Namespace,
-       }
-       chaos, err := r.getRuntimeChaos(ctx, namespacedName)
-       if err != nil {
-               if apierrors.IsNotFound(err) {
-                       return nil
-               }
-               return err
-       }
-       setDefaultStatus(chaos)
-
-       req := getJobType(chaos)
-       job, err := r.getJobByNamespacedName(ctx, 
types.NamespacedName{Namespace: chaos.Namespace, Name: 
reconcile.MakeJobName(chaos.Name, req)})
-       if err != nil || job == nil {
-               return err
-       }
-
-       updatePhase(chaos, job)
-
-       condition := getJobCondition(job.Status.Conditions)
-       if condition == FailureJob {
-               r.Events.Event(chaos, "Warning", "failed", fmt.Sprintf("job: 
%s", job.Name))
-       }
-
-       //update result,when one part job finished,rely on current status
-       if chaos.Status.Phase == v1alpha1.AfterSteady || chaos.Status.Phase == 
v1alpha1.AfterChaos {
-               if err := r.collectJobMsg(ctx, chaos, job); err != nil {
-                       r.Events.Event(chaos, "Warning", "getPodLog", 
err.Error())
-                       return err
-               }
-       }
-
-       if err := r.updateChaosCondition(ctx, chaos); err != nil {
-               return err
-       }
-
-       rt, err := r.getRuntimeChaos(ctx, namespacedName)
-       if err != nil {
-               return err
-       }
-       rt.Status = chaos.Status
-
-       return r.Status().Update(ctx, rt)
-}
-
-func updatePhase(chaos *v1alpha1.ShardingSphereChaos, job *batchV1.Job) {
-       switch {
-       //in this phase,update to next,and collect job msg
-       case chaos.Status.Phase == v1alpha1.BeforeSteady && 
job.Status.Succeeded == 1:
-               chaos.Status.Phase = v1alpha1.AfterSteady
-               //update in next reconcile,wait steady msg collection
-       case chaos.Status.Phase == v1alpha1.AfterSteady && 
chaos.Status.Result.Steady.Result != "":
-               chaos.Status.Phase = v1alpha1.BeforeChaos
-       case chaos.Status.Phase == v1alpha1.BeforeChaos && job.Status.Succeeded 
== 1:
-               chaos.Status.Phase = v1alpha1.AfterChaos
-       }
-
-}
-
 func setDefaultStatus(chaos *v1alpha1.ShardingSphereChaos) {
        if chaos.Status.Phase == "" {
                chaos.Status.Phase = v1alpha1.BeforeSteady
        }
 }
 
-// getJobType to get the coming job requirement
-// * BeforeSteady: it hasn't been started, could start a new experiment
-// * AfterSteady: it has been finished, could start a new experiment
-// * InjectChaos: it has been started, could start some pressure
-// * recoveredChaos: it has been recovered, could start to verify
-func getJobType(ssChaos *v1alpha1.ShardingSphereChaos) reconcile.JobType {
-       var jobType reconcile.JobType
-
-       if ssChaos.Status.Phase == v1alpha1.BeforeSteady || 
ssChaos.Status.Phase == v1alpha1.AfterSteady {
-               jobType = reconcile.InSteady
-       }
-
-       if ssChaos.Status.Phase == v1alpha1.BeforeChaos || ssChaos.Status.Phase 
== v1alpha1.AfterChaos {
-               jobType = reconcile.InChaos
-       }
-
-       return jobType
-}
-
-func (r *ShardingSphereChaosReconciler) getJobByNamespacedName(ctx 
context.Context, namespacedName types.NamespacedName) (*batchV1.Job, error) {
-       job, err := r.Job.GetByNamespacedName(ctx, namespacedName)
-       if err != nil {
-               return nil, err
-       }
-       return job, nil
-}
-
-func getJobCondition(conditions []batchV1.JobCondition) JobCondition {
-       var ret = ActiveJob
-       for i := range conditions {
-               p := &conditions[i]
-               switch {
-               case p.Type == batchV1.JobComplete && p.Status == 
corev1.ConditionTrue:
-                       ret = CompleteJob
-               case p.Type == batchV1.JobFailed && p.Status == 
corev1.ConditionTrue:
-                       ret = FailureJob
-               case p.Type == batchV1.JobSuspended && p.Status == 
corev1.ConditionTrue:
-                       ret = SuspendJob
-               case p.Type == batchV1.JobFailureTarget:
-                       ret = FailureJob
-               }
-
-       }
-       return ret
-}
-
-func (r *ShardingSphereChaosReconciler) collectJobMsg(ctx context.Context, 
ssChaos *v1alpha1.ShardingSphereChaos, rJob *batchV1.Job) error {
-       if isExistResult(ssChaos) {
-               return nil
-       }
-
-       condition := getJobCondition(rJob.Status.Conditions)
-       var result *v1alpha1.Msg
-       if ssChaos.Status.Phase == v1alpha1.AfterSteady {
-               result = &ssChaos.Status.Result.Steady
-       } else if ssChaos.Status.Phase == v1alpha1.AfterChaos {
-               result = &ssChaos.Status.Result.Chaos
-       }
-
-       if condition == CompleteJob {
-               log, err := r.getPodLog(ctx, rJob)
-               if err != nil {
-                       return err
-               }
-               //todo: unpack msg with json
-               //if err := json.Unmarshal(log, result); err != nil {
-               //      return err
-               //}
-               result.Result = string(log)
-       }
-
-       if condition == FailureJob {
-               log, err := r.getPodLog(ctx, rJob)
-               if err != nil {
-                       return err
-               }
-               result.FailureDetails = string(log)
-       }
-
-       return nil
-}
-
-func isExistResult(ssChaos *v1alpha1.ShardingSphereChaos) bool {
-       if ssChaos.Status.Phase == v1alpha1.AfterSteady && 
ssChaos.Status.Result.Steady.Result == "" {
-               return true
-       }
-       if ssChaos.Status.Phase == v1alpha1.AfterChaos && 
ssChaos.Status.Result.Chaos.Result == "" {
-               return true
-       }
-
-       return false
-}
-
-// FIXME: this will broke if the log is too long
-func (r *ShardingSphereChaosReconciler) getPodLog(ctx context.Context, rJob 
*batchV1.Job) ([]byte, error) {
-       pods := &corev1.PodList{}
-
-       if err := r.List(ctx, pods, client.MatchingLabels{"controller-uid": 
rJob.Spec.Template.Labels["controller-uid"]}); err != nil {
-               return nil, err
-       }
-       if pods.Items == nil || len(pods.Items) == 0 {
-               return nil, nil
-       }
-
-       pod := &pods.Items[0]
-
-       req := r.ClientSet.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, 
&corev1.PodLogOptions{})
-       res := req.Do(ctx)
-       if res.Error() != nil {
-               return []byte{}, res.Error()
-       }
-       ret, err := res.Raw()
-       if err != nil {
-               return []byte{}, err
-       }
-       return ret, nil
-}
-
 func (r *ShardingSphereChaosReconciler) updateChaosCondition(ctx 
context.Context, chaos *v1alpha1.ShardingSphereChaos) error {
        namespacedName := types.NamespacedName{
                Namespace: chaos.Namespace,
@@ -617,96 +506,6 @@ func (r *ShardingSphereChaosReconciler) 
createConfigMap(ctx context.Context, cha
        return err
 }
 
-// TODO: consider a new job name pattern
-func (r *ShardingSphereChaosReconciler) updateJob(ctx context.Context, 
requirement reconcile.JobType, chao *v1alpha1.ShardingSphereChaos, cur 
*batchV1.Job) error {
-       isEqual, err := reconcile.IsJobChanged(chao, requirement, cur)
-       if err != nil {
-               return err
-       }
-       if !isEqual {
-               if err := r.Delete(ctx, cur); err != nil && 
!apierrors.IsNotFound(err) {
-                       return err
-               }
-       }
-       return nil
-}
-
-func (r *ShardingSphereChaosReconciler) createJob(ctx context.Context, 
requirement reconcile.JobType, chao *v1alpha1.ShardingSphereChaos) error {
-       injectJob, err := reconcile.NewJob(chao, requirement)
-       if err != nil {
-               return err
-       }
-       if err := ctrl.SetControllerReference(chao, injectJob, r.Scheme); err 
!= nil {
-               return err
-       }
-
-       err = r.Create(ctx, injectJob)
-       if err != nil {
-               return client.IgnoreAlreadyExists(err)
-       }
-       //FIXME: consider remove the following L620-L676, perhaps don't need to 
pay much attention to the pod
-       rJob := &batchV1.Job{}
-       backoff := wait.Backoff{
-               Steps:    6,
-               Duration: 500 * time.Millisecond,
-               Factor:   5.0,
-               Jitter:   0.1,
-       }
-
-       if err := retry.OnError(
-               backoff,
-               func(e error) bool {
-                       return true
-               },
-               func() error {
-                       return r.Get(ctx, types.NamespacedName{Namespace: 
chao.Namespace, Name: reconcile.MakeJobName(chao.Name, requirement)}, rJob)
-               },
-       ); err != nil {
-               return err
-       }
-
-       podList := &corev1.PodList{}
-       if err := retry.OnError(backoff, func(e error) bool {
-               return e != nil
-       }, func() error {
-               if err := r.List(ctx, podList, 
client.MatchingLabels{"controller-uid": 
rJob.Spec.Template.Labels["controller-uid"]}); err != nil {
-                       return err
-               }
-               if len(podList.Items) == 0 {
-                       return ErrNoPod
-               }
-               return nil
-       }); err != nil {
-               return err
-       }
-
-       for i := range podList.Items {
-               rPod := &podList.Items[i]
-               if err := ctrl.SetControllerReference(rJob, rPod, r.Scheme); 
err != nil {
-                       return err
-               }
-
-               exp := rPod.DeepCopy()
-               updateBackoff := wait.Backoff{
-                       Steps:    5,
-                       Duration: 30 * time.Millisecond,
-                       Factor:   5.0,
-                       Jitter:   0.1,
-               }
-               if err := retry.RetryOnConflict(updateBackoff, func() error {
-                       if err := r.Update(ctx, exp); err != nil {
-                               return err
-                       }
-                       return nil
-               }); err != nil {
-                       return err
-               }
-       }
-
-       r.Events.Event(chao, "Normal", "Created", fmt.Sprintf("%s job created", 
requirement))
-       return nil
-}
-
 // SetupWithManager sets up the controller with the Manager.
 func (r *ShardingSphereChaosReconciler) SetupWithManager(mgr ctrl.Manager) 
error {
        return ctrl.NewControllerManagedBy(mgr).
diff --git a/shardingsphere-operator/pkg/pressure/pressure.go 
b/shardingsphere-operator/pkg/pressure/pressure.go
new file mode 100644
index 0000000..061bc3b
--- /dev/null
+++ b/shardingsphere-operator/pkg/pressure/pressure.go
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package pressure
+
+import (
+       "context"
+       "database/sql"
+       "fmt"
+       "sync"
+       "time"
+
+       "github.com/database-mesh/golang-sdk/pkg/random"
+
+       
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
+       _ "github.com/go-sql-driver/mysql"
+)
+
+type Pressure struct {
+       Active         bool
+       Name           string
+       Result         Result
+       Err            error
+       Tasks          []v1alpha1.DistSQL
+       finishSignalCh chan struct{}
+       wg             sync.WaitGroup
+}
+
+var (
+       db       *sql.DB
+       totalReq int
+)
+
+type Result struct {
+       //total exec req Number
+       Total int
+       //total success req Number
+       Success int
+       //todo: get total or get every exec
+
+       //total time in this Pressure execution
+       Duration time.Duration
+}
+
+func NewPressure(name string, tasks []v1alpha1.DistSQL) *Pressure {
+       return &Pressure{
+               Active:         false,
+               Name:           name,
+               Result:         Result{},
+               Err:            nil,
+               Tasks:          tasks,
+               wg:             sync.WaitGroup{},
+               finishSignalCh: make(chan struct{}),
+       }
+}
+
+// todo: get conn args by labels over string
+func initDB(connArgs string) error {
+       var err error
+       db, err = sql.Open("mysql", connArgs)
+       if err != nil {
+               return err
+       }
+       if err := db.Ping(); err != nil {
+               return err
+       }
+       db.SetConnMaxLifetime(60 * time.Second)
+       return nil
+}
+
+func (p *Pressure) Run(ctx context.Context, pressureCfg *v1alpha1.PressureCfg) 
{
+       p.Active = true
+       totalReq = 0
+
+       //judge nil for simplify test
+       if db == nil {
+               if err := initDB(pressureCfg.SsHost); err != nil {
+                       p.Err = err
+                       return
+               }
+               defer func() {
+                       if err := db.Close(); err != nil {
+                               p.Err = err
+                       }
+               }()
+       }
+
+       result := &p.Result
+       pressureCtx, cancel := context.WithTimeout(context.Background(), 
pressureCfg.Duration.Duration)
+       defer cancel()
+       ticker := time.NewTicker(pressureCfg.ReqTime.Duration)
+       resCh := make(chan bool, 1000)
+
+       //handle result
+       go p.handleResponse(pressureCtx, resCh, result)
+
+       //statistics the running time
+       start := time.Now()
+FOR:
+       for {
+               select {
+               case <-ctx.Done():
+                       break FOR
+               case <-pressureCtx.Done():
+                       break FOR
+               case <-ticker.C:
+                       for i := 0; i < pressureCfg.ConcurrentNum; i++ {
+                               totalReq += pressureCfg.ReqNum
+                               //todo: handle err
+
+                               //put wg here to prevent: when root ctx is 
closed,but some exec task do not start yet
+                               p.wg.Add(1)
+                               go p.exec(pressureCtx, pressureCfg.ReqNum, 
resCh)
+                       }
+               }
+       }
+
+       //occur when pressureCtx or root ctx closed
+
+       //wait all exec calls return,we can safely close the result channel
+       p.wg.Wait()
+       end := time.Now()
+       p.Result.Duration = end.Sub(start)
+       close(resCh)
+
+       //wait collect results channel finished
+       <-p.finishSignalCh
+
+       //when all task finished,update active
+       p.Active = false
+}
+
+func (p *Pressure) exec(ctx context.Context, times int, res chan bool) {
+       defer p.wg.Done()
+       for i := 0; i < times; i++ {
+               select {
+               case <-ctx.Done():
+                       return
+               default:
+               }
+               if len(p.Tasks) == 0 {
+                       return
+               }
+               for i := range p.Tasks {
+                       //generate diff sql, put result into channel
+                       args := randomArgs(p.Tasks[i].Args)
+                       _, err := db.Exec(p.Tasks[i].SQL, args)
+                       res <- err == nil
+               }
+       }
+}
+
+func (p *Pressure) handleResponse(ctx context.Context, resCh chan bool, result 
*Result) {
+For:
+       for {
+               select {
+               case <-ctx.Done():
+                       break For
+               case ret := <-resCh:
+                       //todo: add more msg
+                       handle(ret, result)
+               }
+       }
+
+       //get left handleResponse
+       for ret := range resCh {
+               handle(ret, result)
+       }
+
+       //when all handle finish,put a signal to finish chan
+       p.finishSignalCh <- struct{}{}
+}
+
+//todo:add more logic and change ret type(bool ---> struct)
+func handle(ret bool, result *Result) {
+       if ret {
+               result.Success++
+       }
+       result.Total++
+}
+
+func randomArgs(args []string) []string {
+       var ret []string
+       for i := range args {
+               randomArg := fmt.Sprintf("%s-%s", args[i], random.StringN(4))
+               ret = append(ret, randomArg)
+       }
+       return ret
+}
diff --git a/shardingsphere-operator/pkg/pressure/pressure_test.go 
b/shardingsphere-operator/pkg/pressure/pressure_test.go
new file mode 100644
index 0000000..ac2b2f3
--- /dev/null
+++ b/shardingsphere-operator/pkg/pressure/pressure_test.go
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package pressure
+
+import (
+       "bou.ke/monkey"
+       "context"
+       "database/sql"
+       "github.com/DATA-DOG/go-sqlmock"
+       
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
+       . "github.com/onsi/ginkgo/v2"
+       . "github.com/onsi/gomega"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "regexp"
+       "testing"
+       "time"
+)
+
+func TestPressure(t *testing.T) {
+       RegisterFailHandler(Fail)
+       RunSpecs(t, "Controllers Suite")
+}
+
+var _ = Describe("test pressure", func() {
+       var (
+               dbmock sqlmock.Sqlmock
+               err    error
+       )
+       BeforeEach(func() {
+               db, dbmock, err = sqlmock.New()
+               Expect(err).To(BeNil())
+               Expect(db).NotTo(BeNil())
+               Expect(dbmock).NotTo(BeNil())
+
+               monkey.Patch(sql.Open, func(driverName, dataSourceName string) 
(*sql.DB, error) {
+                       return db, nil
+               })
+       })
+
+       AfterEach(func() {
+               monkey.Unpatch(sql.Open)
+               db.Close()
+       })
+
+       Context("test Run function", func() {
+               It("should Run successfully", func() {
+                       registerStorageUnitCase := &v1alpha1.PressureCfg{
+                               ZkHost:   "",
+                               SsHost:   "test",
+                               Duration: metav1.Duration{Duration: 20 * 
time.Second},
+                               ReqTime:  metav1.Duration{Duration: 5 * 
time.Second},
+                               DistSQLs: []v1alpha1.DistSQL{
+                                       {
+                                               SQL: "REGISTER STORAGE UNIT ?",
+                                               Args: []string{
+                                                       "**",
+                                               },
+                                       },
+                               },
+                               ConcurrentNum: 2,
+                               ReqNum:        5,
+                       }
+
+                       dbmock.ExpectExec(regexp.QuoteMeta("REGISTER STORAGE 
UNIT")).WillReturnResult(sqlmock.NewResult(1, 1))
+                       pressure := NewPressure("verify", 
registerStorageUnitCase.DistSQLs)
+                       pressure.Run(context.TODO(), registerStorageUnitCase)
+
+                       Expect(pressure.Result.Total > 0).To(BeTrue())
+                       Expect(pressure.Result.Success >= 0).To(BeTrue())
+                       Expect(pressure.Result.Total >= 
pressure.Result.Success).To(BeTrue())
+                       Expect(pressure.Result.Duration.Milliseconds() >= 
registerStorageUnitCase.Duration.Milliseconds()).To(BeTrue())
+                       Expect(pressure.Active).To(BeFalse())
+               })
+       })
+
+})

Reply via email to