This is an automated email from the ASF dual-hosted git repository.
miaoliyao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/shardingsphere-on-cloud.git
The following commit(s) were added to refs/heads/main by this push:
new d95b2cd fix(chaos): add IT mocks and fix chaos bugs
new 36e311a Merge pull request #364 from moomman/add-ssChaos-test
d95b2cd is described below
commit d95b2cd714e7bd1e139021d1445c0fa8be801f3b
Author: moonman <[email protected]>
AuthorDate: Thu May 18 14:05:15 2023 +0800
fix(chaos): add IT mocks and fix chaos bugs
---
.../api/v1alpha1/shardingsphere_chaos_types.go | 9 +-
.../api/v1alpha1/zz_generated.deepcopy.go | 13 +-
.../cmd/shardingsphere-operator/manager/option.go | 22 +-
.../controllers/shardingsphere_chaos_controller.go | 264 +++++++++++-------
.../shardingsphere_chaos_controller_test.go | 298 +++++++++++++++++++++
.../pkg/kubernetes/chaosmesh/builder.go | 122 ++++-----
.../pkg/kubernetes/chaosmesh/chaosmesh.go | 22 +-
.../pkg/kubernetes/chaosmesh/mocks/store.go | 1 +
.../pkg/kubernetes/configmap/builders.go | 1 +
shardingsphere-operator/pkg/pressure/pressure.go | 72 ++---
.../pkg/reconcile/shardingspherechaos/job.go | 1 +
shardingsphere-operator/test/e2e/e2e_suite_test.go | 33 +--
.../e2e/shardingsphere_chaos_controller_test.go | 138 +++++++---
13 files changed, 717 insertions(+), 279 deletions(-)
diff --git a/shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go
b/shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go
index 43431e4..f5321a5 100644
--- a/shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go
+++ b/shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go
@@ -59,7 +59,7 @@ type PressureCfg struct {
type DistSQL struct {
SQL string `json:"sql"`
- Args []string `json:"args"`
+ Args []string `json:"args,omitempty"`
}
type Script string
@@ -94,9 +94,10 @@ const (
// ShardingSphereChaosStatus defines the actual state of ShardingSphereChaos
type ShardingSphereChaosStatus struct {
- ChaosCondition ChaosCondition `json:"chaosCondition"`
- Phase ChaosPhase `json:"phase"`
- Result Result `json:"result"`
+ ChaosCondition ChaosCondition `json:"chaosCondition"`
+ Phase ChaosPhase `json:"phase"`
+ Result Result `json:"result"`
+ Conditions []*metav1.Condition `json:"condition,omitempty"`
}
// Result represents the result of the ShardingSphereChaos
diff --git a/shardingsphere-operator/api/v1alpha1/zz_generated.deepcopy.go
b/shardingsphere-operator/api/v1alpha1/zz_generated.deepcopy.go
index f2e6b08..48efd64 100644
--- a/shardingsphere-operator/api/v1alpha1/zz_generated.deepcopy.go
+++ b/shardingsphere-operator/api/v1alpha1/zz_generated.deepcopy.go
@@ -1407,7 +1407,7 @@ func (in *ShardingSphereChaos) DeepCopyInto(out
*ShardingSphereChaos) {
out.TypeMeta = in.TypeMeta
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
in.Spec.DeepCopyInto(&out.Spec)
- out.Status = in.Status
+ in.Status.DeepCopyInto(&out.Status)
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver,
creating a new ShardingSphereChaos.
@@ -1482,6 +1482,17 @@ func (in *ShardingSphereChaosSpec) DeepCopy()
*ShardingSphereChaosSpec {
func (in *ShardingSphereChaosStatus) DeepCopyInto(out
*ShardingSphereChaosStatus) {
*out = *in
out.Result = in.Result
+ if in.Conditions != nil {
+ in, out := &in.Conditions, &out.Conditions
+ *out = make([]*metav1.Condition, len(*in))
+ for i := range *in {
+ if (*in)[i] != nil {
+ in, out := &(*in)[i], &(*out)[i]
+ *out = new(metav1.Condition)
+ (*in).DeepCopyInto(*out)
+ }
+ }
+ }
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver,
creating a new ShardingSphereChaosStatus.
diff --git
a/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go
b/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go
index ded3c83..d25f814 100644
--- a/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go
+++ b/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go
@@ -21,8 +21,6 @@ import (
"flag"
"strings"
-
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/pressure"
-
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/controllers"
sschaos
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/chaosmesh"
@@ -94,7 +92,7 @@ func ParseOptionsFromCmdFlags() *Options {
flag.BoolVar(&opt.LeaderElection, "leader-elect", false,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active
controller manager.")
- flag.StringVar(&opt.FeatureGates, "feature-gates",
"ShardingSphereChaos=true", "A set of key=value pairs that describe feature
gates for alpha/experimental features.")
+ flag.StringVar(&opt.FeatureGates, "feature-gates", "", "A set of
key=value pairs that describe feature gates for alpha/experimental features.")
// aws client options
flag.StringVar(&AwsAccessKeyID, "aws-access-key-id", "", "The AWS
access key ID.")
flag.StringVar(&AwsSecretAccessKey, "aws-secret-key", "", "The AWS
secret access key.")
@@ -171,15 +169,15 @@ var featureGatesHandlers = map[string]FeatureGateHandler{
return err
}
if err := (&controllers.ShardingSphereChaosReconciler{
- Client: mgr.GetClient(),
- Scheme: mgr.GetScheme(),
- Log: mgr.GetLogger(),
- Chaos: sschaos.NewChaos(mgr.GetClient()),
- Job: job.NewJob(mgr.GetClient()),
- ExecRecorder: make([]*pressure.Pressure, 0),
- ConfigMap:
configmap.NewConfigMapClient(mgr.GetClient()),
- Events:
mgr.GetEventRecorderFor("shardingsphere-chaos-controller"),
- ClientSet: clientset,
+ Client: mgr.GetClient(),
+ Scheme: mgr.GetScheme(),
+ Log: mgr.GetLogger(),
+ Chaos: sschaos.NewChaos(mgr.GetClient()),
+ Job: job.NewJob(mgr.GetClient()),
+ ExecCtrls: make([]*controllers.ExecCtrl, 0),
+ ConfigMap:
configmap.NewConfigMapClient(mgr.GetClient()),
+ Events:
mgr.GetEventRecorderFor("shardingsphere-chaos-controller"),
+ ClientSet: clientset,
}).SetupWithManager(mgr); err != nil {
logger.Error(err, "unable to create controller",
"controller", "ShardingSphereChaos")
return err
diff --git
a/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller.go
b/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller.go
index 089abfe..c512ccf 100644
--- a/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller.go
+++ b/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller.go
@@ -19,29 +19,29 @@ package controllers
import (
"context"
- "errors"
"fmt"
-
-
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/pressure"
-
- "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
+ "reflect"
+ "time"
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
- sschaos
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/chaosmesh"
+
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/chaosmesh"
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/configmap"
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/job"
- reconcile
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/reconcile/shardingspherechaos"
+
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/pressure"
+ sschaos
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/reconcile/shardingspherechaos"
"github.com/go-logr/logr"
batchV1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
+ "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)
const (
@@ -49,17 +49,6 @@ const (
SSChaosFinalizeName =
"shardingsphere.apache.org/finalizer"
)
-type JobCondition string
-
-var (
- CompleteJob JobCondition = "complete"
- FailureJob JobCondition = "failure"
- SuspendJob JobCondition = "suspend"
- ActiveJob JobCondition = "active"
-
- ErrNoPod = errors.New("no pod in list")
-)
-
// ShardingSphereChaosReconciler is a controller for the ShardingSphereChaos
type ShardingSphereChaosReconciler struct {
client.Client
@@ -69,10 +58,16 @@ type ShardingSphereChaosReconciler struct {
Events record.EventRecorder
ClientSet *clientset.Clientset
- Chaos sschaos.Chaos
- Job job.Job
- ExecRecorder []*pressure.Pressure
- ConfigMap configmap.ConfigMap
+ Chaos chaosmesh.Chaos
+ Job job.Job
+ ExecCtrls []*ExecCtrl
+ ConfigMap configmap.ConfigMap
+}
+
+type ExecCtrl struct {
+ cancel context.CancelFunc
+ pressure *pressure.Pressure
+ ctx context.Context
}
// Reconcile handles main function of this controller
@@ -82,48 +77,43 @@ func (r *ShardingSphereChaosReconciler) Reconcile(ctx
context.Context, req ctrl.
ssChaos, err := r.getRuntimeChaos(ctx, req.NamespacedName)
if err != nil {
- if apierrors.IsNotFound(err) {
- return ctrl.Result{RequeueAfter: defaultRequeueTime},
nil
- }
-
- logger.Error(err, "failed to get the shardingsphere chaos")
- return ctrl.Result{Requeue: true}, err
+ return ctrl.Result{}, client.IgnoreNotFound(err)
}
- if err := r.finalize(ctx, ssChaos); err != nil {
- return ctrl.Result{Requeue: true}, err
+ logger.Info("start reconcile shardingspherechaos")
+ if ssChaos.ObjectMeta.DeletionTimestamp.IsZero() {
+ if !controllerutil.ContainsFinalizer(ssChaos,
SSChaosFinalizeName) {
+ controllerutil.AddFinalizer(ssChaos,
SSChaosFinalizeName)
+ if err := r.Update(ctx, ssChaos); err != nil {
+ return ctrl.Result{}, err
+ }
+ }
+ } else if controllerutil.ContainsFinalizer(ssChaos,
SSChaosFinalizeName) {
+ return r.finalize(ctx, ssChaos)
}
- logger.Info("start reconcile chaos")
-
- //TODO: consider merge these events
var errors []error
if err := r.reconcileChaos(ctx, ssChaos); err != nil {
- if err != nil {
- errors = append(errors, err)
- }
+ errors = append(errors, err)
+
logger.Error(err, "reconcile shardingspherechaos error")
r.Events.Event(ssChaos, "Warning", "shardingspherechaos error",
err.Error())
}
if err := r.reconcileConfigMap(ctx, ssChaos); err != nil {
- if err != nil {
- errors = append(errors, err)
- }
+ errors = append(errors, err)
+
logger.Error(err, "reconcile configmap error")
r.Events.Event(ssChaos, "Warning", "configmap error",
err.Error())
}
if err := r.reconcilePressure(ctx, ssChaos); err != nil {
- if err != nil {
- errors = append(errors, err)
- }
+ errors = append(errors, err)
}
if err := r.reconcileStatus(ctx, ssChaos); err != nil {
- if err != nil {
- errors = append(errors, err)
- }
+ errors = append(errors, err)
+
logger.Error(err, "failed to update status")
}
if len(errors) > 0 {
@@ -133,42 +123,73 @@ func (r *ShardingSphereChaosReconciler) Reconcile(ctx
context.Context, req ctrl.
}
func (r *ShardingSphereChaosReconciler) reconcilePressure(ctx context.Context,
chao *v1alpha1.ShardingSphereChaos) error {
+
+ if chao.Status.Phase == "" {
+ return nil
+ }
exec := r.getNeedExec(chao)
//if exec in this phase do not exist,create it
if exec == nil {
exec := pressure.NewPressure(getExecName(chao),
chao.Spec.PressureCfg.DistSQLs)
- go exec.Run(ctx, &chao.Spec.PressureCfg)
- r.ExecRecorder = append(r.ExecRecorder, exec)
+
+ //we need to set active to true,prevent it start after we start
reconcile status
+ exec.Active = true
+ execCtx, cancel := context.WithCancel(ctx)
+ execCtrl := &ExecCtrl{
+ cancel: cancel,
+ pressure: exec,
+ ctx: execCtx,
+ }
+
+ go exec.Run(execCtx, &chao.Spec.PressureCfg)
+ r.ExecCtrls = append(r.ExecCtrls, execCtrl)
}
return nil
}
func (r *ShardingSphereChaosReconciler) reconcileStatus(ctx context.Context,
chaos *v1alpha1.ShardingSphereChaos) error {
- namespacedName := types.NamespacedName{
- Name: chaos.Name,
- Namespace: chaos.Namespace,
- }
+
+ cur := chaos.Status.DeepCopy()
setDefaultStatus(chaos)
+
+ updateCondition(chaos)
+
r.updatePhaseExec(chaos)
if err := r.updateChaosCondition(ctx, chaos); err != nil {
return err
}
- rt, err := r.getRuntimeChaos(ctx, namespacedName)
- if err != nil {
- return err
+ if reflect.DeepEqual(cur, chaos.Status) {
+
+ return nil
}
- rt.Status = chaos.Status
- return r.Status().Update(ctx, rt)
+ return r.Status().Update(ctx, chaos)
+}
+
+func updateCondition(chaos *v1alpha1.ShardingSphereChaos) {
+ phase := chaos.Status.Phase
+
+ for i := range chaos.Status.Conditions {
+ condition := chaos.Status.Conditions[i]
+ if string(phase) == condition.Type {
+ if condition.Status == v1alpha1.ConditionStatusFalse {
+ condition.Status = v1alpha1.ConditionStatusTrue
+ condition.LastTransitionTime =
metav1.Time{Time: time.Now()}
+ }
+ return
+ }
+ }
}
func (r *ShardingSphereChaosReconciler) updatePhaseExec(chaos
*v1alpha1.ShardingSphereChaos) {
exec := r.getNeedExec(chaos)
+
+ //because the goroutine asynchronous,we cant check it start immediately
or not
if exec == nil || exec.Active {
return
}
@@ -176,28 +197,36 @@ func (r *ShardingSphereChaosReconciler)
updatePhaseExec(chaos *v1alpha1.Sharding
//todo: judge error
msg := generateMsgFromExec(exec)
+ var nextPhase v1alpha1.ChaosPhase
//when exec finished, update phase
switch chaos.Status.Phase {
case v1alpha1.BeforeSteady:
+ nextPhase = v1alpha1.AfterSteady
+ case v1alpha1.AfterSteady:
chaos.Status.Result.Steady = *msg
- chaos.Status.Phase = v1alpha1.BeforeChaos
+ //todo: add metrics
+
+ nextPhase = v1alpha1.BeforeChaos
case v1alpha1.BeforeChaos:
chaos.Status.Result.Chaos = *msg
- chaos.Status.Phase = v1alpha1.AfterChaos
+ //todo: add metrics
+
+ nextPhase = v1alpha1.AfterChaos
+ //case v1alpha1.AfterChaos:
+ // //todo: check result here
+ // return
+ default:
+ return
}
+ chaos.Status.Phase = nextPhase
}
func generateMsgFromExec(exec *pressure.Pressure) *v1alpha1.Msg {
//todo: wait to change result compute way
- rate := 0
- if exec.Result.Total == 0 {
- rate = 0
- } else {
- rate = exec.Result.Success / exec.Result.Total
- }
+
msg := v1alpha1.Msg{
- Result: fmt.Sprintf("%d", rate),
+ Result: fmt.Sprintf("%d/%d", exec.Result.Success,
exec.Result.Total),
Duration: exec.Result.Duration.String(),
}
if exec.Err != nil {
@@ -209,23 +238,29 @@ func generateMsgFromExec(exec *pressure.Pressure)
*v1alpha1.Msg {
func getExecName(chao *v1alpha1.ShardingSphereChaos) string {
var execName string
+ nameSpacedName := types.NamespacedName{Namespace: chao.Namespace, Name:
chao.Name}
+
if chao.Status.Phase == v1alpha1.BeforeSteady || chao.Status.Phase ==
v1alpha1.AfterSteady {
- execName = reconcile.MakeJobName(chao.Name, reconcile.InSteady)
+ execName = makeExecName(nameSpacedName,
string(sschaos.InSteady))
}
if chao.Status.Phase == v1alpha1.BeforeChaos || chao.Status.Phase ==
v1alpha1.AfterChaos {
- execName = reconcile.MakeJobName(chao.Name, reconcile.InChaos)
+ execName = makeExecName(nameSpacedName, string(sschaos.InChaos))
}
return execName
}
+func makeExecName(namespacedName types.NamespacedName, execType string) string
{
+ return fmt.Sprintf("%s-%s-%s", namespacedName.Namespace,
namespacedName.Name, execType)
+}
+
func (r *ShardingSphereChaosReconciler) getNeedExec(chao
*v1alpha1.ShardingSphereChaos) *pressure.Pressure {
jobName := getExecName(chao)
//if pressure do not exist,run it
- for i := range r.ExecRecorder {
- if r.ExecRecorder[i].Name == jobName {
- return r.ExecRecorder[i]
+ for i := range r.ExecCtrls {
+ if r.ExecCtrls[i].pressure.Name == jobName {
+ return r.ExecCtrls[i].pressure
}
}
@@ -239,26 +274,22 @@ func (r *ShardingSphereChaosReconciler)
getRuntimeChaos(ctx context.Context, nam
}
// nolint:nestif
-func (r *ShardingSphereChaosReconciler) finalize(ctx context.Context, chao
*v1alpha1.ShardingSphereChaos) error {
- if chao.ObjectMeta.DeletionTimestamp.IsZero() {
- if !controllerutil.ContainsFinalizer(chao, SSChaosFinalizeName)
{
- controllerutil.AddFinalizer(chao, SSChaosFinalizeName)
- if err := r.Update(ctx, chao); err != nil {
- return err
- }
- }
- } else if controllerutil.ContainsFinalizer(chao, SSChaosFinalizeName) {
- if err := r.deleteExternalResources(ctx, chao); err != nil {
- return err
- }
+func (r *ShardingSphereChaosReconciler) finalize(ctx context.Context, ssChaos
*v1alpha1.ShardingSphereChaos) (ctrl.Result, error) {
+ namespacedName := types.NamespacedName{
+ Namespace: ssChaos.Namespace,
+ Name: ssChaos.Name,
+ }
+ r.deleteExec(namespacedName)
+ if err := r.deleteExternalResources(ctx, ssChaos); err != nil {
+ return ctrl.Result{}, err
+ }
- controllerutil.RemoveFinalizer(chao, SSChaosFinalizeName)
- if err := r.Update(ctx, chao); err != nil {
- return err
- }
+ controllerutil.RemoveFinalizer(ssChaos, SSChaosFinalizeName)
+ if err := r.Update(ctx, ssChaos); err != nil {
+ return ctrl.Result{}, err
}
- return nil
+ return ctrl.Result{}, nil
}
func (r *ShardingSphereChaosReconciler) deleteExternalResources(ctx
context.Context, chao *v1alpha1.ShardingSphereChaos) error {
@@ -282,6 +313,20 @@ func (r *ShardingSphereChaosReconciler)
deleteExternalResources(ctx context.Cont
return nil
}
+func (r *ShardingSphereChaosReconciler) deleteExec(namespacedName
types.NamespacedName) {
+ steady, chaos := makeExecName(namespacedName,
string(sschaos.InSteady)), makeExecName(namespacedName, string(sschaos.InChaos))
+ execR := make([]*ExecCtrl, 0, len(r.ExecCtrls))
+ for i := range r.ExecCtrls {
+ exec := r.ExecCtrls[i].pressure
+ if exec.Name == steady || exec.Name == chaos {
+ r.ExecCtrls[i].cancel()
+ continue
+ }
+ execR = append(execR, r.ExecCtrls[i])
+ }
+ r.ExecCtrls = execR
+}
+
func (r *ShardingSphereChaosReconciler) deletePodChaos(ctx context.Context,
namespacedName types.NamespacedName) error {
podchao, err := r.getPodChaosByNamespacedName(ctx, namespacedName)
if err != nil {
@@ -316,7 +361,6 @@ func (r *ShardingSphereChaosReconciler) reconcileChaos(ctx
context.Context, chao
if chaos.Status.Phase == "" || chaos.Status.Phase ==
v1alpha1.BeforeSteady || chaos.Status.Phase == v1alpha1.AfterSteady {
return nil
}
-
namespacedName := types.NamespacedName{
Namespace: chaos.Namespace,
Name: chaos.Name,
@@ -351,7 +395,7 @@ func (r *ShardingSphereChaosReconciler)
reconcilePodChaos(ctx context.Context, c
return r.createPodChaos(ctx, chaos)
}
-func (r *ShardingSphereChaosReconciler) getPodChaosByNamespacedName(ctx
context.Context, namespacedName types.NamespacedName) (sschaos.PodChaos, error)
{
+func (r *ShardingSphereChaosReconciler) getPodChaosByNamespacedName(ctx
context.Context, namespacedName types.NamespacedName) (chaosmesh.PodChaos,
error) {
pc, err := r.Chaos.GetPodChaosByNamespacedName(ctx, namespacedName)
if err != nil {
return nil, err
@@ -368,7 +412,7 @@ func (r *ShardingSphereChaosReconciler) createPodChaos(ctx
context.Context, chao
return nil
}
-func (r *ShardingSphereChaosReconciler) updatePodChaos(ctx context.Context,
chaos *v1alpha1.ShardingSphereChaos, podChaos sschaos.PodChaos) error {
+func (r *ShardingSphereChaosReconciler) updatePodChaos(ctx context.Context,
chaos *v1alpha1.ShardingSphereChaos, podChaos chaosmesh.PodChaos) error {
err := r.Chaos.UpdatePodChaos(ctx, podChaos, chaos)
if err != nil {
return err
@@ -389,7 +433,7 @@ func (r *ShardingSphereChaosReconciler)
reconcileNetworkChaos(ctx context.Contex
return r.createNetworkChaos(ctx, chaos)
}
-func (r *ShardingSphereChaosReconciler) updateNetWorkChaos(ctx
context.Context, chaos *v1alpha1.ShardingSphereChaos, networkChaos
sschaos.NetworkChaos) error {
+func (r *ShardingSphereChaosReconciler) updateNetWorkChaos(ctx
context.Context, chaos *v1alpha1.ShardingSphereChaos, networkChaos
chaosmesh.NetworkChaos) error {
err := r.Chaos.UpdateNetworkChaos(ctx, networkChaos, chaos)
if err != nil {
return err
@@ -435,9 +479,39 @@ func (r *ShardingSphereChaosReconciler)
reconcileConfigMap(ctx context.Context,
}
func setDefaultStatus(chaos *v1alpha1.ShardingSphereChaos) {
+
if chaos.Status.Phase == "" {
chaos.Status.Phase = v1alpha1.BeforeSteady
}
+
+ if len(chaos.Status.Conditions) == 0 {
+ chaos.Status.Conditions = []*metav1.Condition{
+ {
+ Type:
string(v1alpha1.BeforeSteady),
+ Status: metav1.ConditionTrue,
+ LastTransitionTime: metav1.Time{Time:
time.Now()},
+ Reason: "InSteadyExperiment",
+ },
+ {
+ Type:
string(v1alpha1.AfterSteady),
+ Status: metav1.ConditionFalse,
+ LastTransitionTime: metav1.Time{Time:
time.Now()},
+ Reason: "AfterSteadyExperiment",
+ },
+ {
+ Type:
string(v1alpha1.BeforeChaos),
+ Status: metav1.ConditionFalse,
+ LastTransitionTime: metav1.Time{Time:
time.Now()},
+ Reason: "InChaoExperiment",
+ },
+ {
+ Type: string(v1alpha1.AfterChaos),
+ Status: metav1.ConditionFalse,
+ LastTransitionTime: metav1.Time{Time:
time.Now()},
+ Reason: "AfterChaosExperiment",
+ },
+ }
+ }
}
func (r *ShardingSphereChaosReconciler) updateChaosCondition(ctx
context.Context, chaos *v1alpha1.ShardingSphereChaos) error {
@@ -451,7 +525,7 @@ func (r *ShardingSphereChaosReconciler)
updateChaosCondition(ctx context.Context
if err != nil {
return err
}
- chaos.Status.ChaosCondition = sschaos.ConvertChaosStatus(ctx,
chaos, pc)
+ chaos.Status.ChaosCondition = chaosmesh.ConvertChaosStatus(ctx,
chaos, pc)
}
if chaos.Spec.EmbedChaos.NetworkChaos != nil {
@@ -459,13 +533,13 @@ func (r *ShardingSphereChaosReconciler)
updateChaosCondition(ctx context.Context
if err != nil {
return err
}
- chaos.Status.ChaosCondition = sschaos.ConvertChaosStatus(ctx,
chaos, nc)
+ chaos.Status.ChaosCondition = chaosmesh.ConvertChaosStatus(ctx,
chaos, nc)
}
return nil
}
-func (r *ShardingSphereChaosReconciler) getNetworkChaosByNamespacedName(ctx
context.Context, namespacedName types.NamespacedName) (sschaos.NetworkChaos,
error) {
+func (r *ShardingSphereChaosReconciler) getNetworkChaosByNamespacedName(ctx
context.Context, namespacedName types.NamespacedName) (chaosmesh.NetworkChaos,
error) {
nc, err := r.Chaos.GetNetworkChaosByNamespacedName(ctx, namespacedName)
if err != nil {
return nil, err
@@ -483,7 +557,7 @@ func (r *ShardingSphereChaosReconciler)
getConfigMapByNamespacedName(ctx context
}
func (r *ShardingSphereChaosReconciler) updateConfigMap(ctx context.Context,
chaos *v1alpha1.ShardingSphereChaos, cur *corev1.ConfigMap) error {
- // exp := reconcile.UpdateShardingSphereChaosConfigMap(chao, cur)
+ // exp := sschaos.UpdateShardingSphereChaosConfigMap(chao, cur)
exp := r.ConfigMap.Build(ctx, chaos)
exp.ObjectMeta = cur.ObjectMeta
exp.ObjectMeta.ResourceVersion = ""
diff --git
a/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller_test.go
b/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller_test.go
new file mode 100644
index 0000000..0fe1ef3
--- /dev/null
+++
b/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller_test.go
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package controllers
+
+import (
+ "context"
+ "database/sql"
+ "regexp"
+ "time"
+
+
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
+ mockChaos
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/chaosmesh/mocks"
+
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/configmap"
+
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/pressure"
+ reconcile
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/reconcile/shardingspherechaos"
+
+ "bou.ke/monkey"
+ "github.com/DATA-DOG/go-sqlmock"
+ "github.com/golang/mock/gomock"
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/types"
+ clientgoscheme "k8s.io/client-go/kubernetes/scheme"
+ "k8s.io/client-go/tools/record"
+ ctrl "sigs.k8s.io/controller-runtime"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+ "sigs.k8s.io/controller-runtime/pkg/client/fake"
+ logf "sigs.k8s.io/controller-runtime/pkg/log"
+)
+
+func mockchaosStub(chaos *mockChaos.MockChaos) {
+ chaos.EXPECT().NewNetworkChaos(gomock.Any(),
gomock.Any()).Return(gomock.Any()).AnyTimes()
+ chaos.EXPECT().NewPodChaos(gomock.Any(),
gomock.Any()).Return(gomock.Any()).AnyTimes()
+
+ chaos.EXPECT().CreatePodChaos(gomock.Any(),
gomock.Any()).Return(nil).AnyTimes()
+ chaos.EXPECT().CreateNetworkChaos(gomock.Any(),
gomock.Any()).Return(nil).AnyTimes()
+
+ chaos.EXPECT().DeleteNetworkChaos(gomock.Any(),
gomock.Any()).Return(nil).AnyTimes()
+ chaos.EXPECT().DeletePodChaos(gomock.Any(),
gomock.Any()).Return(nil).AnyTimes()
+
+ chaos.EXPECT().UpdatePodChaos(gomock.Any(), gomock.Any(),
gomock.Any()).Return(nil).AnyTimes()
+ chaos.EXPECT().UpdateNetworkChaos(gomock.Any(), gomock.Any(),
gomock.Any()).Return(nil).AnyTimes()
+
+ chaos.EXPECT().GetNetworkChaosByNamespacedName(gomock.Any(),
gomock.Any()).Return(gomock.Any(), nil).AnyTimes()
+ chaos.EXPECT().GetPodChaosByNamespacedName(gomock.Any(),
gomock.Any()).Return(gomock.Any(), nil).AnyTimes()
+}
+
+func mockDBStub(mock sqlmock.Sqlmock) {
+ mock.ExpectExec(regexp.QuoteMeta("REGISTER STORAGE
UNIT")).WillReturnResult(sqlmock.NewResult(1, 1))
+}
+
+var _ = Describe("shardingsphere mock test", func() {
+ var (
+ testNamespacedName = types.NamespacedName{
+ Namespace: "test-ssChaos-namespace",
+ Name: "test-ssChaos-name",
+ }
+ duration = "30s"
+ )
+ var (
+ ctx = context.TODO()
+ fakeClient client.Client
+ reconciler *ShardingSphereChaosReconciler
+ mockCtrl *gomock.Controller
+ mockchaos *mockChaos.MockChaos
+ db *sql.DB
+ )
+
+ BeforeEach(func() {
+ scheme := runtime.NewScheme()
+ Expect(clientgoscheme.AddToScheme(scheme)).To(Succeed())
+ Expect(v1alpha1.AddToScheme(scheme)).To(Succeed())
+ fakeClient = fake.NewClientBuilder().WithScheme(scheme).Build()
+ mockCtrl = gomock.NewController(GinkgoT())
+
+ mockchaos = mockChaos.NewMockChaos(mockCtrl)
+ mockchaosStub(mockchaos)
+
+ reconciler = &ShardingSphereChaosReconciler{
+ Client: fakeClient,
+ Scheme: scheme,
+ Log: logf.Log,
+ Events: record.NewFakeRecorder(100),
+ Chaos: mockchaos,
+ ExecCtrls: make([]*ExecCtrl, 0),
+ ConfigMap: configmap.NewConfigMapClient(fakeClient),
+ }
+
+ var (
+ dbmock sqlmock.Sqlmock
+ err error
+ )
+
+ db, dbmock, err = sqlmock.New()
+ Expect(err).To(BeNil())
+ Expect(db).NotTo(BeNil())
+ Expect(dbmock).NotTo(BeNil())
+
+ monkey.Patch(sql.Open, func(driverName, dataSourceName string)
(*sql.DB, error) {
+ return db, nil
+ })
+
+ mockDBStub(dbmock)
+ })
+
+ AfterEach(func() {
+ monkey.UnpatchAll()
+ db.Close()
+ })
+
+ Context("create shardingsphere chaos", func() {
+ It("should create successfully", func() {
+ ssChaos := &v1alpha1.ShardingSphereChaos{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: testNamespacedName.Name,
+ Namespace: testNamespacedName.Namespace,
+ },
+ Spec: v1alpha1.ShardingSphereChaosSpec{
+ EmbedChaos: v1alpha1.EmbedChaos{
+ PodChaos:
&v1alpha1.PodChaosSpec{
+ PodSelector:
v1alpha1.PodSelector{
+ LabelSelectors:
map[string]string{
+
"app.kubernetes.io/component": "zookeeper",
+ },
+ },
+ Action:
v1alpha1.PodFailure,
+ Params:
v1alpha1.PodChaosParams{
+ PodFailure:
&v1alpha1.PodFailureParams{
+
Duration: &duration,
+ },
+ },
+ },
+ },
+ PressureCfg: v1alpha1.PressureCfg{
+ SsHost: "127.0.0.1:3306/ds_1",
+ Duration:
metav1.Duration{Duration: 30 * time.Second},
+ ReqTime:
metav1.Duration{Duration: 30 * time.Second},
+ DistSQLs: []v1alpha1.DistSQL{
+ {
+ SQL: "REGISTER
STORAGE UNIT ?()",
+ Args:
[]string{"ds_1"},
+ },
+ },
+ ConcurrentNum: 2,
+ ReqNum: 5,
+ },
+ },
+ Status: v1alpha1.ShardingSphereChaosStatus{},
+ }
+
+ Expect(fakeClient.Create(ctx,
ssChaos)).Should(Succeed())
+ chaos := &v1alpha1.ShardingSphereChaos{}
+ _, err := reconciler.Reconcile(ctx,
ctrl.Request{NamespacedName: testNamespacedName})
+ Expect(err).To(BeNil())
+ Expect(fakeClient.Get(ctx, testNamespacedName,
chaos)).Should(Succeed())
+ Expect(fakeClient.Delete(ctx, chaos)).Should(Succeed())
+ })
+ })
+
+ Context("reconcile ssChaos in BeforeSteady phase", func() {
+ It("chaos should be nil,execRecorder should be steady", func() {
+ ssChaos := &v1alpha1.ShardingSphereChaos{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: testNamespacedName.Name,
+ Namespace: testNamespacedName.Namespace,
+ },
+ Spec: v1alpha1.ShardingSphereChaosSpec{
+ EmbedChaos: v1alpha1.EmbedChaos{
+ PodChaos:
&v1alpha1.PodChaosSpec{
+ PodSelector:
v1alpha1.PodSelector{
+ LabelSelectors:
map[string]string{
+
"app.kubernetes.io/component": "zookeeper",
+ },
+ },
+ Action:
v1alpha1.PodFailure,
+ Params:
v1alpha1.PodChaosParams{
+ PodFailure:
&v1alpha1.PodFailureParams{
+
Duration: &duration,
+ },
+ },
+ },
+ },
+ PressureCfg: v1alpha1.PressureCfg{
+ SsHost: "127.0.0.1:3306/ds_1",
+ Duration:
metav1.Duration{Duration: 30 * time.Second},
+ ReqTime:
metav1.Duration{Duration: 30 * time.Second},
+ DistSQLs: []v1alpha1.DistSQL{
+ {
+ SQL: "REGISTER
STORAGE UNIT ?()",
+ Args:
[]string{"ds_1"},
+ },
+ },
+ ConcurrentNum: 2,
+ ReqNum: 5,
+ },
+ },
+ Status: v1alpha1.ShardingSphereChaosStatus{},
+ }
+
+ Expect(fakeClient.Create(ctx,
ssChaos)).Should(Succeed())
+ for i := 0; i < 5; i++ {
+ _, err := reconciler.Reconcile(ctx,
ctrl.Request{NamespacedName: testNamespacedName})
+ Expect(err).To(BeNil())
+ }
+
+ var inSteadyChaos v1alpha1.ShardingSphereChaos
+ Expect(fakeClient.Get(ctx, testNamespacedName,
&inSteadyChaos)).Should(Succeed())
+
Expect(inSteadyChaos.Status.Phase).To(Equal(v1alpha1.BeforeSteady))
+
+ Expect(len(reconciler.ExecCtrls)).To(Equal(1))
+ Expect(fakeClient.Delete(ctx,
&inSteadyChaos)).Should(Succeed())
+ })
+ })
+
+ Context("reconcile ssChaos in BeforeChaos", func() {
+ It("phase should in beforeChaos,execRecorder should gt 2",
func() {
+ ssChaos := &v1alpha1.ShardingSphereChaos{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: testNamespacedName.Name,
+ Namespace: testNamespacedName.Namespace,
+ },
+ Spec: v1alpha1.ShardingSphereChaosSpec{
+ EmbedChaos: v1alpha1.EmbedChaos{
+ PodChaos:
&v1alpha1.PodChaosSpec{
+ PodSelector:
v1alpha1.PodSelector{
+ LabelSelectors:
map[string]string{
+
"app.kubernetes.io/component": "zookeeper",
+ },
+ },
+ Action:
v1alpha1.PodFailure,
+ Params:
v1alpha1.PodChaosParams{
+ PodFailure:
&v1alpha1.PodFailureParams{
+
Duration: &duration,
+ },
+ },
+ },
+ },
+ PressureCfg: v1alpha1.PressureCfg{
+ SsHost: "127.0.0.1:3306/ds_1",
+ Duration:
metav1.Duration{Duration: 30 * time.Second},
+ ReqTime:
metav1.Duration{Duration: 30 * time.Second},
+ DistSQLs: []v1alpha1.DistSQL{
+ {
+ SQL: "REGISTER
STORAGE UNIT ?()",
+ Args:
[]string{"ds_1"},
+ },
+ },
+ ConcurrentNum: 2,
+ ReqNum: 5,
+ },
+ },
+ Status: v1alpha1.ShardingSphereChaosStatus{
+ Phase: v1alpha1.BeforeChaos,
+ },
+ }
+
+ Expect(fakeClient.Create(ctx,
ssChaos)).Should(Succeed())
+ var chao v1alpha1.ShardingSphereChaos
+ Expect(fakeClient.Get(ctx, testNamespacedName,
&chao)).Should(Succeed())
+ steadyExec :=
pressure.NewPressure(reconcile.MakeJobName(ssChaos.Name, reconcile.InSteady),
ssChaos.Spec.PressureCfg.DistSQLs)
+ steadyExec.Active = false
+ execCtx, cancel := context.WithCancel(ctx)
+ execCtrl := ExecCtrl{
+ cancel: cancel,
+ pressure: steadyExec,
+ ctx: execCtx,
+ }
+ reconciler.ExecCtrls = append(reconciler.ExecCtrls,
&execCtrl)
+
+ for i := 0; i < 10; i++ {
+ _, err := reconciler.Reconcile(ctx,
ctrl.Request{NamespacedName: testNamespacedName})
+ Expect(err).To(BeNil())
+ }
+ Expect(len(reconciler.ExecCtrls)).To(Equal(2))
+
+ var inChaosChaos v1alpha1.ShardingSphereChaos
+ Expect(fakeClient.Get(ctx, testNamespacedName,
&inChaosChaos)).Should(Succeed())
+
Expect(inChaosChaos.Status.Phase).To(Equal(v1alpha1.BeforeChaos))
+ })
+ })
+})
diff --git a/shardingsphere-operator/pkg/kubernetes/chaosmesh/builder.go
b/shardingsphere-operator/pkg/kubernetes/chaosmesh/builder.go
index 16ad0fb..3cc44fc 100644
--- a/shardingsphere-operator/pkg/kubernetes/chaosmesh/builder.go
+++ b/shardingsphere-operator/pkg/kubernetes/chaosmesh/builder.go
@@ -24,7 +24,7 @@ import (
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
- chaosv1alpha1 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
+ chaosmeshv1alpha1 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
@@ -57,20 +57,20 @@ var (
type GenericChaos interface{}
func ConvertChaosStatus(ctx context.Context, ssChaos
*v1alpha1.ShardingSphereChaos, chaos GenericChaos) v1alpha1.ChaosCondition {
- var status chaosv1alpha1.ChaosStatus
+ var status chaosmeshv1alpha1.ChaosStatus
if ssChaos.Spec.EmbedChaos.PodChaos != nil {
- if podChao, ok := chaos.(*chaosv1alpha1.PodChaos); ok &&
podChao != nil {
+ if podChao, ok := chaos.(*chaosmeshv1alpha1.PodChaos); ok &&
podChao != nil {
status = *podChao.GetStatus()
} else {
return v1alpha1.Unknown
}
} else if ssChaos.Spec.EmbedChaos.NetworkChaos != nil {
- if networkChaos, ok := chaos.(*chaosv1alpha1.NetworkChaos); ok
&& networkChaos != nil {
+ if networkChaos, ok := chaos.(*chaosmeshv1alpha1.NetworkChaos);
ok && networkChaos != nil {
status = *networkChaos.GetStatus()
}
return v1alpha1.Unknown
}
- var conditions = map[chaosv1alpha1.ChaosConditionType]bool{}
+ var conditions = map[chaosmeshv1alpha1.ChaosConditionType]bool{}
for i := range status.Conditions {
conditions[status.Conditions[i].Type] =
status.Conditions[i].Status == corev1.ConditionTrue
}
@@ -78,21 +78,21 @@ func ConvertChaosStatus(ctx context.Context, ssChaos
*v1alpha1.ShardingSphereCha
return judgeCondition(conditions, status.Experiment.DesiredPhase)
}
-func judgeCondition(condition map[chaosv1alpha1.ChaosConditionType]bool, phase
chaosv1alpha1.DesiredPhase) v1alpha1.ChaosCondition {
+func judgeCondition(condition map[chaosmeshv1alpha1.ChaosConditionType]bool,
phase chaosmeshv1alpha1.DesiredPhase) v1alpha1.ChaosCondition {
- if condition[chaosv1alpha1.ConditionPaused] {
- if !condition[chaosv1alpha1.ConditionSelected] {
+ if condition[chaosmeshv1alpha1.ConditionPaused] {
+ if !condition[chaosmeshv1alpha1.ConditionSelected] {
return v1alpha1.NoTarget
}
return v1alpha1.Paused
}
- if condition[chaosv1alpha1.ConditionSelected] {
- if condition[chaosv1alpha1.ConditionAllRecovered] && phase ==
chaosv1alpha1.StoppedPhase {
+ if condition[chaosmeshv1alpha1.ConditionSelected] {
+ if condition[chaosmeshv1alpha1.ConditionAllRecovered] && phase
== chaosmeshv1alpha1.StoppedPhase {
return v1alpha1.AllRecovered
}
- if condition[chaosv1alpha1.ConditionAllInjected] && phase ==
chaosv1alpha1.RunningPhase {
+ if condition[chaosmeshv1alpha1.ConditionAllInjected] && phase
== chaosmeshv1alpha1.RunningPhase {
return v1alpha1.AllInjected
}
}
@@ -107,7 +107,7 @@ func NewPodChaos(ssChao *v1alpha1.ShardingSphereChaos)
(PodChaos, error) {
chao := ssChao.Spec.PodChaos
if act, ok := ssChao.Annotations[podAction]; ok {
pcb.SetAction(act)
- if gp, ok := ssChao.Annotations[gracePeriod];
chaosv1alpha1.PodChaosAction(act) == chaosv1alpha1.PodKillAction && ok {
+ if gp, ok := ssChao.Annotations[gracePeriod];
chaosmeshv1alpha1.PodChaosAction(act) == chaosmeshv1alpha1.PodKillAction && ok {
gpInt, err := strconv.ParseInt(gp, 10, 64)
if err != nil {
return nil, err
@@ -130,7 +130,7 @@ func NewPodChaos(ssChao *v1alpha1.ShardingSphereChaos)
(PodChaos, error) {
psb.SetSelectMode(ssChao.Annotations[podSelectorMode]).
SetValue(ssChao.Annotations[podSelectorValue])
- containerSelector := &chaosv1alpha1.ContainerSelector{
+ containerSelector := &chaosmeshv1alpha1.ContainerSelector{
PodSelector: *psb.Build(),
}
@@ -194,34 +194,34 @@ func NewNetworkChaos(ssChao
*v1alpha1.ShardingSphereChaos) (NetworkChaos, error)
ncb.SetDevice(ssChao.Annotations[device]).
SetTargetDevice(ssChao.Annotations[targetDevice])
- tcParams := &chaosv1alpha1.TcParameter{}
+ tcParams := &chaosmeshv1alpha1.TcParameter{}
if chao.Action == v1alpha1.Delay {
- tcParams.Delay = &chaosv1alpha1.DelaySpec{
+ tcParams.Delay = &chaosmeshv1alpha1.DelaySpec{
Latency: chao.Params.Delay.Latency,
Jitter: chao.Params.Delay.Jitter,
}
}
if chao.Action == v1alpha1.Corruption {
- tcParams.Corrupt = &chaosv1alpha1.CorruptSpec{
+ tcParams.Corrupt = &chaosmeshv1alpha1.CorruptSpec{
Corrupt: chao.Params.Corruption.Corruption,
}
}
if chao.Action == v1alpha1.Duplication {
- tcParams.Duplicate = &chaosv1alpha1.DuplicateSpec{
+ tcParams.Duplicate = &chaosmeshv1alpha1.DuplicateSpec{
Duplicate: chao.Params.Duplication.Duplication,
}
}
if chao.Action == v1alpha1.Loss {
- tcParams.Loss = &chaosv1alpha1.LossSpec{
+ tcParams.Loss = &chaosmeshv1alpha1.LossSpec{
Loss: chao.Params.Loss.Loss,
}
}
- if chaosv1alpha1.NetworkChaosAction(act) ==
chaosv1alpha1.BandwidthAction {
+ if chaosmeshv1alpha1.NetworkChaosAction(act) ==
chaosmeshv1alpha1.BandwidthAction {
bwab := NewBandWidthActionBuilder()
if ind1, ok := ssChao.Annotations[rate]; ok {
bwab.SetRate(ind1)
@@ -253,11 +253,11 @@ type PodChaosBuilder interface {
SetName(string) PodChaosBuilder
SetLabels(map[string]string) PodChaosBuilder
SetAnnotations(map[string]string) PodChaosBuilder
- SetContainerSelector(*chaosv1alpha1.ContainerSelector) PodChaosBuilder
+ SetContainerSelector(*chaosmeshv1alpha1.ContainerSelector)
PodChaosBuilder
SetAction(string) PodChaosBuilder
SetDuration(*string) PodChaosBuilder
SetGracePeriod(int64) PodChaosBuilder
- Build() *chaosv1alpha1.PodChaos
+ Build() *chaosmeshv1alpha1.PodChaos
}
func NewPodChaosBuilder() PodChaosBuilder {
@@ -267,7 +267,7 @@ func NewPodChaosBuilder() PodChaosBuilder {
}
type podChaosBuilder struct {
- podChaos *chaosv1alpha1.PodChaos
+ podChaos *chaosmeshv1alpha1.PodChaos
}
type BandWidthActionBuilder interface {
@@ -276,17 +276,17 @@ type BandWidthActionBuilder interface {
SetBuffer(string) BandWidthActionBuilder
SetPeakRate(string) BandWidthActionBuilder
SetMinBurst(string) BandWidthActionBuilder
- Build() *chaosv1alpha1.BandwidthSpec
+ Build() *chaosmeshv1alpha1.BandwidthSpec
}
func NewBandWidthActionBuilder() BandWidthActionBuilder {
return &bandWidthActionBuilder{
- bandwidth: &chaosv1alpha1.BandwidthSpec{},
+ bandwidth: &chaosmeshv1alpha1.BandwidthSpec{},
}
}
type bandWidthActionBuilder struct {
- bandwidth *chaosv1alpha1.BandwidthSpec
+ bandwidth *chaosmeshv1alpha1.BandwidthSpec
}
func (b *bandWidthActionBuilder) SetRate(s string) BandWidthActionBuilder {
@@ -328,7 +328,7 @@ func (b *bandWidthActionBuilder) SetMinBurst(s string)
BandWidthActionBuilder {
return b
}
-func (b *bandWidthActionBuilder) Build() *chaosv1alpha1.BandwidthSpec {
+func (b *bandWidthActionBuilder) Build() *chaosmeshv1alpha1.BandwidthSpec {
return b.bandwidth
}
@@ -352,22 +352,22 @@ func (p *podChaosBuilder) SetAnnotations(annotations
map[string]string) PodChaos
return p
}
-func (p *podChaosBuilder) SetContainerSelector(selector
*chaosv1alpha1.ContainerSelector) PodChaosBuilder {
+func (p *podChaosBuilder) SetContainerSelector(selector
*chaosmeshv1alpha1.ContainerSelector) PodChaosBuilder {
p.podChaos.Spec.ContainerSelector = *selector
return p
}
func (p *podChaosBuilder) SetAction(action string) PodChaosBuilder {
if v1alpha1.PodChaosAction(action) == v1alpha1.PodFailure {
- p.podChaos.Spec.Action = chaosv1alpha1.PodFailureAction
+ p.podChaos.Spec.Action = chaosmeshv1alpha1.PodFailureAction
}
if v1alpha1.PodChaosAction(action) == v1alpha1.ContainerKill {
- p.podChaos.Spec.Action = chaosv1alpha1.ContainerKillAction
+ p.podChaos.Spec.Action = chaosmeshv1alpha1.ContainerKillAction
}
- if chaosv1alpha1.PodChaosAction(action) == chaosv1alpha1.PodKillAction {
- p.podChaos.Spec.Action = chaosv1alpha1.PodKillAction
+ if chaosmeshv1alpha1.PodChaosAction(action) ==
chaosmeshv1alpha1.PodKillAction {
+ p.podChaos.Spec.Action = chaosmeshv1alpha1.PodKillAction
}
return p
}
@@ -388,7 +388,7 @@ func (p *podChaosBuilder) SetGracePeriod(gracePeriod int64)
PodChaosBuilder {
return p
}
-func (p *podChaosBuilder) Build() *chaosv1alpha1.PodChaos {
+func (p *podChaosBuilder) Build() *chaosmeshv1alpha1.PodChaos {
return p.podChaos
}
@@ -397,19 +397,19 @@ type NetworkChaosBuilder interface {
SetName(string) NetworkChaosBuilder
SetLabels(map[string]string) NetworkChaosBuilder
SetAnnotations(map[string]string) NetworkChaosBuilder
- SetPodSelector(*chaosv1alpha1.PodSelector) NetworkChaosBuilder
+ SetPodSelector(*chaosmeshv1alpha1.PodSelector) NetworkChaosBuilder
SetAction(string) NetworkChaosBuilder
SetDevice(string) NetworkChaosBuilder
SetDuration(*string) NetworkChaosBuilder
SetDirection(string) NetworkChaosBuilder
- SetTarget(*chaosv1alpha1.PodSelector) NetworkChaosBuilder
+ SetTarget(*chaosmeshv1alpha1.PodSelector) NetworkChaosBuilder
SetTargetDevice(string) NetworkChaosBuilder
- SetTcParameter(chaosv1alpha1.TcParameter) NetworkChaosBuilder
- Build() *chaosv1alpha1.NetworkChaos
+ SetTcParameter(chaosmeshv1alpha1.TcParameter) NetworkChaosBuilder
+ Build() *chaosmeshv1alpha1.NetworkChaos
}
type netWorkChaosBuilder struct {
- netWorkChaos *chaosv1alpha1.NetworkChaos
+ netWorkChaos *chaosmeshv1alpha1.NetworkChaos
}
func (n *netWorkChaosBuilder) SetNamespace(namespace string)
NetworkChaosBuilder {
@@ -432,31 +432,31 @@ func (n *netWorkChaosBuilder) SetAnnotations(annotations
map[string]string) Netw
return n
}
-func (n *netWorkChaosBuilder) SetPodSelector(selector
*chaosv1alpha1.PodSelector) NetworkChaosBuilder {
+func (n *netWorkChaosBuilder) SetPodSelector(selector
*chaosmeshv1alpha1.PodSelector) NetworkChaosBuilder {
n.netWorkChaos.Spec.PodSelector = *selector
return n
}
func (n *netWorkChaosBuilder) SetAction(action string) NetworkChaosBuilder {
//FIXME
- if chaosv1alpha1.NetworkChaosAction(action) ==
chaosv1alpha1.BandwidthAction {
- n.netWorkChaos.Spec.Action = chaosv1alpha1.BandwidthAction
+ if chaosmeshv1alpha1.NetworkChaosAction(action) ==
chaosmeshv1alpha1.BandwidthAction {
+ n.netWorkChaos.Spec.Action = chaosmeshv1alpha1.BandwidthAction
}
if v1alpha1.NetworkChaosAction(action) == v1alpha1.Corruption {
- n.netWorkChaos.Spec.Action = chaosv1alpha1.CorruptAction
+ n.netWorkChaos.Spec.Action = chaosmeshv1alpha1.CorruptAction
}
if v1alpha1.NetworkChaosAction(action) == v1alpha1.Partition {
- n.netWorkChaos.Spec.Action = chaosv1alpha1.PartitionAction
+ n.netWorkChaos.Spec.Action = chaosmeshv1alpha1.PartitionAction
}
if v1alpha1.NetworkChaosAction(action) == v1alpha1.Loss {
- n.netWorkChaos.Spec.Action = chaosv1alpha1.LossAction
+ n.netWorkChaos.Spec.Action = chaosmeshv1alpha1.LossAction
}
if v1alpha1.NetworkChaosAction(action) == v1alpha1.Duplication {
- n.netWorkChaos.Spec.Action = chaosv1alpha1.DuplicateAction
+ n.netWorkChaos.Spec.Action = chaosmeshv1alpha1.DuplicateAction
}
return n
@@ -473,11 +473,11 @@ func (n *netWorkChaosBuilder) SetDuration(duration
*string) NetworkChaosBuilder
}
func (n *netWorkChaosBuilder) SetDirection(direction string)
NetworkChaosBuilder {
- n.netWorkChaos.Spec.Direction = chaosv1alpha1.Direction(direction)
+ n.netWorkChaos.Spec.Direction = chaosmeshv1alpha1.Direction(direction)
return n
}
-func (n *netWorkChaosBuilder) SetTarget(selector *chaosv1alpha1.PodSelector)
NetworkChaosBuilder {
+func (n *netWorkChaosBuilder) SetTarget(selector
*chaosmeshv1alpha1.PodSelector) NetworkChaosBuilder {
n.netWorkChaos.Spec.Target = selector
return n
}
@@ -487,12 +487,12 @@ func (n *netWorkChaosBuilder)
SetTargetDevice(targetDevice string) NetworkChaosB
return n
}
-func (n *netWorkChaosBuilder) SetTcParameter(parameter
chaosv1alpha1.TcParameter) NetworkChaosBuilder {
+func (n *netWorkChaosBuilder) SetTcParameter(parameter
chaosmeshv1alpha1.TcParameter) NetworkChaosBuilder {
n.netWorkChaos.Spec.TcParameter = parameter
return n
}
-func (n *netWorkChaosBuilder) Build() *chaosv1alpha1.NetworkChaos {
+func (n *netWorkChaosBuilder) Build() *chaosmeshv1alpha1.NetworkChaos {
return n.netWorkChaos
}
@@ -514,17 +514,17 @@ type PodSelectorBuilder interface {
SetLabelSelector(map[string]string) PodSelectorBuilder
SetExpressionSelectors([]metav1.LabelSelectorRequirement)
PodSelectorBuilder
SetAnnotationSelectors(map[string]string) PodSelectorBuilder
- Build() *chaosv1alpha1.PodSelector
+ Build() *chaosmeshv1alpha1.PodSelector
}
func NewPodSelectorBuilder() PodSelectorBuilder {
return &podSelectorBuilder{
- podSelector: &chaosv1alpha1.PodSelector{},
+ podSelector: &chaosmeshv1alpha1.PodSelector{},
}
}
type podSelectorBuilder struct {
- podSelector *chaosv1alpha1.PodSelector
+ podSelector *chaosmeshv1alpha1.PodSelector
}
func (p *podSelectorBuilder) SetNamespaces(namespaces []string)
PodSelectorBuilder {
@@ -533,7 +533,7 @@ func (p *podSelectorBuilder) SetNamespaces(namespaces
[]string) PodSelectorBuild
}
func (p *podSelectorBuilder) SetSelectMode(mode string) PodSelectorBuilder {
- p.podSelector.Mode = chaosv1alpha1.SelectorMode(mode)
+ p.podSelector.Mode = chaosmeshv1alpha1.SelectorMode(mode)
return p
}
@@ -582,32 +582,32 @@ func (p *podSelectorBuilder)
SetAnnotationSelectors(annotationSelectors map[stri
return p
}
-func (p *podSelectorBuilder) Build() *chaosv1alpha1.PodSelector {
+func (p *podSelectorBuilder) Build() *chaosmeshv1alpha1.PodSelector {
return p.podSelector
}
-func DefaultPodChaos() *chaosv1alpha1.PodChaos {
- return &chaosv1alpha1.PodChaos{
+func DefaultPodChaos() *chaosmeshv1alpha1.PodChaos {
+ return &chaosmeshv1alpha1.PodChaos{
ObjectMeta: metav1.ObjectMeta{
Name: "shardingsphere-proxy",
Namespace: "default",
Labels: map[string]string{},
},
- Spec: chaosv1alpha1.PodChaosSpec{
- Action: chaosv1alpha1.ContainerKillAction,
+ Spec: chaosmeshv1alpha1.PodChaosSpec{
+ Action: chaosmeshv1alpha1.ContainerKillAction,
},
}
}
-func DefaultNetworkChaos() *chaosv1alpha1.NetworkChaos {
- return &chaosv1alpha1.NetworkChaos{
+func DefaultNetworkChaos() *chaosmeshv1alpha1.NetworkChaos {
+ return &chaosmeshv1alpha1.NetworkChaos{
ObjectMeta: metav1.ObjectMeta{
Name: "shardingsphere-proxy",
Namespace: "default",
Labels: map[string]string{},
},
- Spec: chaosv1alpha1.NetworkChaosSpec{
- Action: chaosv1alpha1.PartitionAction,
+ Spec: chaosmeshv1alpha1.NetworkChaosSpec{
+ Action: chaosmeshv1alpha1.PartitionAction,
Direction: "to",
},
}
diff --git a/shardingsphere-operator/pkg/kubernetes/chaosmesh/chaosmesh.go
b/shardingsphere-operator/pkg/kubernetes/chaosmesh/chaosmesh.go
index e54cb52..9e1d3bc 100644
--- a/shardingsphere-operator/pkg/kubernetes/chaosmesh/chaosmesh.go
+++ b/shardingsphere-operator/pkg/kubernetes/chaosmesh/chaosmesh.go
@@ -23,7 +23,7 @@ import (
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
- chaosmeshapi "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
+ chaosmeshv1alpha1 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -85,7 +85,7 @@ type getter struct {
type PodChaos interface{}
func (cg getter) GetPodChaosByNamespacedName(ctx context.Context,
namespacedName types.NamespacedName) (PodChaos, error) {
- chaos := &chaosmeshapi.PodChaos{}
+ chaos := &chaosmeshv1alpha1.PodChaos{}
if err := cg.Get(ctx, namespacedName, chaos); err != nil {
if apierrors.IsNotFound(err) {
return nil, nil
@@ -99,7 +99,7 @@ func (cg getter) GetPodChaosByNamespacedName(ctx
context.Context, namespacedName
type NetworkChaos interface{}
func (cg getter) GetNetworkChaosByNamespacedName(ctx context.Context,
namespacedName types.NamespacedName) (NetworkChaos, error) {
- chaos := &chaosmeshapi.NetworkChaos{}
+ chaos := &chaosmeshv1alpha1.NetworkChaos{}
if err := cg.Get(ctx, namespacedName, chaos); err != nil {
if apierrors.IsNotFound(err) {
return nil, nil
@@ -132,7 +132,7 @@ func (cs setter) CreatePodChaos(ctx context.Context,
sschaos *v1alpha1.ShardingS
if err != nil {
return err
}
- return cs.Client.Create(ctx, pc.(*chaosmeshapi.PodChaos))
+ return cs.Client.Create(ctx, pc.(*chaosmeshv1alpha1.PodChaos))
}
// UpdatePodChaos updates a pod chaos
@@ -141,11 +141,11 @@ func (cs setter) UpdatePodChaos(ctx context.Context,
podChaos PodChaos, sschaos
if err != nil {
return err
}
- s, ok := pc.(*chaosmeshapi.PodChaos)
+ s, ok := pc.(*chaosmeshv1alpha1.PodChaos)
if !ok {
return ErrConvert
}
- t, ok := podChaos.(*chaosmeshapi.PodChaos)
+ t, ok := podChaos.(*chaosmeshv1alpha1.PodChaos)
if !ok {
return ErrConvert
}
@@ -159,7 +159,7 @@ func (cs setter) UpdatePodChaos(ctx context.Context,
podChaos PodChaos, sschaos
// DeletePodChaos deletes a pod chaos
func (cs setter) DeletePodChaos(ctx context.Context, chao PodChaos) error {
- podChao, ok := chao.(*chaosmeshapi.PodChaos)
+ podChao, ok := chao.(*chaosmeshv1alpha1.PodChaos)
if !ok {
return ErrConvert
}
@@ -176,7 +176,7 @@ func (cs setter) CreateNetworkChaos(ctx context.Context,
sschaos *v1alpha1.Shard
if err != nil {
return err
}
- return cs.Client.Create(ctx, nc.(*chaosmeshapi.NetworkChaos))
+ return cs.Client.Create(ctx, nc.(*chaosmeshv1alpha1.NetworkChaos))
}
// UpdateNetworkChaos updates a network chaos
@@ -185,11 +185,11 @@ func (cs setter) UpdateNetworkChaos(ctx context.Context,
networkChaos NetworkCha
if err != nil {
return err
}
- s, ok := pc.(*chaosmeshapi.NetworkChaos)
+ s, ok := pc.(*chaosmeshv1alpha1.NetworkChaos)
if !ok {
return ErrConvert
}
- t, ok := networkChaos.(*chaosmeshapi.NetworkChaos)
+ t, ok := networkChaos.(*chaosmeshv1alpha1.NetworkChaos)
if !ok {
return ErrConvert
}
@@ -202,7 +202,7 @@ func (cs setter) UpdateNetworkChaos(ctx context.Context,
networkChaos NetworkCha
}
func (cs setter) DeleteNetworkChaos(ctx context.Context, chao NetworkChaos)
error {
- networkChaos, ok := chao.(*chaosmeshapi.NetworkChaos)
+ networkChaos, ok := chao.(*chaosmeshv1alpha1.NetworkChaos)
if !ok {
return ErrConvert
}
diff --git a/shardingsphere-operator/pkg/kubernetes/chaosmesh/mocks/store.go
b/shardingsphere-operator/pkg/kubernetes/chaosmesh/mocks/store.go
index 9caffb4..1cb3780 100644
--- a/shardingsphere-operator/pkg/kubernetes/chaosmesh/mocks/store.go
+++ b/shardingsphere-operator/pkg/kubernetes/chaosmesh/mocks/store.go
@@ -10,6 +10,7 @@ import (
v1alpha1
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
chaosmesh
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/chaosmesh"
+
gomock "github.com/golang/mock/gomock"
types "k8s.io/apimachinery/pkg/types"
)
diff --git a/shardingsphere-operator/pkg/kubernetes/configmap/builders.go
b/shardingsphere-operator/pkg/kubernetes/configmap/builders.go
index 4c68f14..db0ab46 100644
--- a/shardingsphere-operator/pkg/kubernetes/configmap/builders.go
+++ b/shardingsphere-operator/pkg/kubernetes/configmap/builders.go
@@ -21,6 +21,7 @@ import (
"reflect"
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
+
"gopkg.in/yaml.v2"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
diff --git a/shardingsphere-operator/pkg/pressure/pressure.go
b/shardingsphere-operator/pkg/pressure/pressure.go
index 061bc3b..49cfb14 100644
--- a/shardingsphere-operator/pkg/pressure/pressure.go
+++ b/shardingsphere-operator/pkg/pressure/pressure.go
@@ -24,8 +24,6 @@ import (
"sync"
"time"
- "github.com/database-mesh/golang-sdk/pkg/random"
-
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
_ "github.com/go-sql-driver/mysql"
)
@@ -41,8 +39,7 @@ type Pressure struct {
}
var (
- db *sql.DB
- totalReq int
+ db *sql.DB
)
type Result struct {
@@ -68,9 +65,9 @@ func NewPressure(name string, tasks []v1alpha1.DistSQL)
*Pressure {
}
}
-// todo: get conn args by labels over string
func initDB(connArgs string) error {
var err error
+
db, err = sql.Open("mysql", connArgs)
if err != nil {
return err
@@ -84,21 +81,18 @@ func initDB(connArgs string) error {
func (p *Pressure) Run(ctx context.Context, pressureCfg *v1alpha1.PressureCfg)
{
p.Active = true
- totalReq = 0
+ //when all task finished,update active
+ defer func() {
+ p.Active = false
+ }()
- //judge nil for simplify test
- if db == nil {
- if err := initDB(pressureCfg.SsHost); err != nil {
- p.Err = err
- return
- }
- defer func() {
- if err := db.Close(); err != nil {
- p.Err = err
- }
- }()
+ if err := initDB(pressureCfg.SsHost); err != nil {
+ p.Err = err
+ return
}
+ defer db.Close()
+
result := &p.Result
pressureCtx, cancel := context.WithTimeout(context.Background(),
pressureCfg.Duration.Duration)
defer cancel()
@@ -106,7 +100,7 @@ func (p *Pressure) Run(ctx context.Context, pressureCfg
*v1alpha1.PressureCfg) {
resCh := make(chan bool, 1000)
//handle result
- go p.handleResponse(pressureCtx, resCh, result)
+ go p.handleResponse(resCh, result)
//statistics the running time
start := time.Now()
@@ -119,7 +113,6 @@ FOR:
break FOR
case <-ticker.C:
for i := 0; i < pressureCfg.ConcurrentNum; i++ {
- totalReq += pressureCfg.ReqNum
//todo: handle err
//put wg here to prevent: when root ctx is
closed,but some exec task do not start yet
@@ -139,9 +132,6 @@ FOR:
//wait collect results channel finished
<-p.finishSignalCh
-
- //when all task finished,update active
- p.Active = false
}
func (p *Pressure) exec(ctx context.Context, times int, res chan bool) {
@@ -151,30 +141,21 @@ func (p *Pressure) exec(ctx context.Context, times int,
res chan bool) {
case <-ctx.Done():
return
default:
- }
- if len(p.Tasks) == 0 {
- return
- }
- for i := range p.Tasks {
- //generate diff sql, put result into channel
- args := randomArgs(p.Tasks[i].Args)
- _, err := db.Exec(p.Tasks[i].SQL, args)
- res <- err == nil
+ if len(p.Tasks) == 0 {
+ return
+ }
+ for i := range p.Tasks {
+ //generate diff sql, put result into channel
+ args := randomArgs(p.Tasks[i].Args)
+ _, err := db.Exec(p.Tasks[i].SQL, args...)
+
+ res <- err == nil
+ }
}
}
}
-func (p *Pressure) handleResponse(ctx context.Context, resCh chan bool, result
*Result) {
-For:
- for {
- select {
- case <-ctx.Done():
- break For
- case ret := <-resCh:
- //todo: add more msg
- handle(ret, result)
- }
- }
+func (p *Pressure) handleResponse(resCh chan bool, result *Result) {
//get left handleResponse
for ret := range resCh {
@@ -191,12 +172,13 @@ func handle(ret bool, result *Result) {
result.Success++
}
result.Total++
+
}
-func randomArgs(args []string) []string {
- var ret []string
+func randomArgs(args []string) []any {
+ var ret []any
for i := range args {
- randomArg := fmt.Sprintf("%s-%s", args[i], random.StringN(4))
+ randomArg := fmt.Sprintf("%s-%d", args[i],
time.Now().UnixNano())
ret = append(ret, randomArg)
}
return ret
diff --git a/shardingsphere-operator/pkg/reconcile/shardingspherechaos/job.go
b/shardingsphere-operator/pkg/reconcile/shardingspherechaos/job.go
index 855e0fe..dc6663a 100644
--- a/shardingsphere-operator/pkg/reconcile/shardingspherechaos/job.go
+++ b/shardingsphere-operator/pkg/reconcile/shardingspherechaos/job.go
@@ -24,6 +24,7 @@ import (
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/reconcile/common"
+
v1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
diff --git a/shardingsphere-operator/test/e2e/e2e_suite_test.go
b/shardingsphere-operator/test/e2e/e2e_suite_test.go
index bd7566b..576af0b 100644
--- a/shardingsphere-operator/test/e2e/e2e_suite_test.go
+++ b/shardingsphere-operator/test/e2e/e2e_suite_test.go
@@ -19,6 +19,9 @@ package e2e
import (
"context"
+ mockChaos
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/chaosmesh/mocks"
+ "github.com/golang/mock/gomock"
+ clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"os/exec"
"path/filepath"
"testing"
@@ -51,6 +54,7 @@ var (
ctx context.Context
cancel context.CancelFunc
err error
+ mockchaos *mockChaos.MockChaos
)
func TestControllers(t *testing.T) {
@@ -96,6 +100,7 @@ var _ = BeforeSuite(func() {
Expect(v1alpha1.AddToScheme(scheme.Scheme)).NotTo(HaveOccurred())
Expect(dbmeshv1alpha1.AddToScheme(scheme.Scheme)).NotTo(HaveOccurred())
+ Expect(clientgoscheme.AddToScheme(scheme.Scheme)).NotTo(HaveOccurred())
//+kubebuilder:scaffold:scheme
k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme})
@@ -125,21 +130,19 @@ var _ = BeforeSuite(func() {
}).SetupWithManager(k8sManager)
Expect(err).ToNot(HaveOccurred())
- /*
- ctl := gomock.NewController(GinkgoT())
- clientset, err := clientset.NewForConfig(k8sManager.GetConfig())
- err = (&controllers.ShardingSphereChaosReconciler{
- Client: k8sManager.GetClient(),
- Scheme: k8sManager.GetScheme(),
- Log: logf.Log,
- Chaos: mockChaos.NewMockChaos(ctl),
- Job: job.NewJob(k8sManager.GetClient()),
- ConfigMap:
configmap.NewConfigMapClient(k8sManager.GetClient()),
- Events:
k8sManager.GetEventRecorderFor("shardingsphere-chaos-controller"),
- ClientSet: clientset,
- }).SetupWithManager(k8sManager)
- Expect(err).ToNot(HaveOccurred())
- */
+ ctl := gomock.NewController(GinkgoT())
+ mockchaos = mockChaos.NewMockChaos(ctl)
+
+ err = (&controllers.ShardingSphereChaosReconciler{
+ Client: k8sManager.GetClient(),
+ Scheme: k8sManager.GetScheme(),
+ Log: logf.Log,
+ Events:
k8sManager.GetEventRecorderFor("shardingsphere-chaos-controller"),
+ Chaos: mockchaos,
+ ExecCtrls: make([]*controllers.ExecCtrl, 0),
+ ConfigMap: configmap.NewConfigMapClient(k8sManager.GetClient()),
+ }).SetupWithManager(k8sManager)
+ Expect(err).ToNot(HaveOccurred())
go func() {
defer GinkgoRecover()
diff --git
a/shardingsphere-operator/test/e2e/shardingsphere_chaos_controller_test.go
b/shardingsphere-operator/test/e2e/shardingsphere_chaos_controller_test.go
index 77d3a99..2c30a15 100644
--- a/shardingsphere-operator/test/e2e/shardingsphere_chaos_controller_test.go
+++ b/shardingsphere-operator/test/e2e/shardingsphere_chaos_controller_test.go
@@ -17,77 +17,145 @@
package e2e
-/*
import (
- "fmt"
- "math/rand"
+ "database/sql"
+ mockChaos
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/chaosmesh/mocks"
+ "github.com/golang/mock/gomock"
+ "regexp"
"time"
+ "bou.ke/monkey"
+ "github.com/DATA-DOG/go-sqlmock"
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
- corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)
+func mockchaosStub(chaos *mockChaos.MockChaos) {
+ chaos.EXPECT().NewNetworkChaos(gomock.Any(),
gomock.Any()).Return(gomock.Any()).AnyTimes()
+ chaos.EXPECT().NewPodChaos(gomock.Any(),
gomock.Any()).Return(gomock.Any()).AnyTimes()
+
+ chaos.EXPECT().CreatePodChaos(gomock.Any(),
gomock.Any()).Return(nil).AnyTimes()
+ chaos.EXPECT().CreateNetworkChaos(gomock.Any(),
gomock.Any()).Return(nil).AnyTimes()
+
+ chaos.EXPECT().DeleteNetworkChaos(gomock.Any(),
gomock.Any()).Return(nil).AnyTimes()
+ chaos.EXPECT().DeletePodChaos(gomock.Any(),
gomock.Any()).Return(nil).AnyTimes()
+
+ chaos.EXPECT().UpdatePodChaos(gomock.Any(), gomock.Any(),
gomock.Any()).Return(nil).AnyTimes()
+ chaos.EXPECT().UpdateNetworkChaos(gomock.Any(), gomock.Any(),
gomock.Any()).Return(nil).AnyTimes()
+
+ chaos.EXPECT().GetNetworkChaosByNamespacedName(gomock.Any(),
gomock.Any()).Return(gomock.Any(), nil).AnyTimes()
+ chaos.EXPECT().GetPodChaosByNamespacedName(gomock.Any(),
gomock.Any()).Return(gomock.Any(), nil).AnyTimes()
+}
+
+func mockDBStub(mock sqlmock.Sqlmock) {
+ mock.ExpectExec(regexp.QuoteMeta("REGISTER STORAGE
UNIT")).WillReturnResult(sqlmock.NewResult(1, 1))
+}
+
+type compare struct {
+ phase v1alpha1.ChaosPhase
+ conditionStatus []metav1.ConditionStatus
+}
+
var _ = Describe("ShardingSphereChaos", func() {
- var d = "5m"
+ var (
+ testNamespacedName = types.NamespacedName{
+ Namespace: "default",
+ Name: "testsschaos",
+ }
+ duration = "5s"
+ db *sql.DB
+ )
- Context("check related resource created by ShardingSphereChaos
Controller", func() {
+ BeforeEach(func() {
var (
- ssChaos *v1alpha1.ShardingSphereChaos
- name = fmt.Sprintf("%s-%d", "test.sschaos-",
rand.Int31())
- namespace = "default"
+ dbmock sqlmock.Sqlmock
+ err error
)
- BeforeEach(func() {
- ssChaos = &v1alpha1.ShardingSphereChaos{
+
+ db, dbmock, err = sqlmock.New()
+ Expect(err).To(BeNil())
+ Expect(db).NotTo(BeNil())
+ Expect(dbmock).NotTo(BeNil())
+
+ monkey.Patch(sql.Open, func(driverName, dataSourceName string)
(*sql.DB, error) {
+ return db, nil
+ })
+ mockchaosStub(mockchaos)
+ mockDBStub(dbmock)
+ })
+
+ AfterEach(func() {
+ monkey.UnpatchAll()
+ db.Close()
+ })
+
+ Context("reconcile ShardingSphereChaos", func() {
+ var desireStatus = compare{
+ phase: v1alpha1.AfterChaos,
+ conditionStatus:
[]metav1.ConditionStatus{metav1.ConditionTrue, metav1.ConditionTrue,
metav1.ConditionTrue, metav1.ConditionTrue},
+ }
+
+ It("should create successfully", func() {
+ ssChaos := &v1alpha1.ShardingSphereChaos{
ObjectMeta: metav1.ObjectMeta{
- Name: name,
- Namespace: namespace,
- Labels: map[string]string{
- "app": "shardingsphere-proxy",
- },
- Annotations: map[string]string{
- "spec/mode": "all",
- },
+ Name: testNamespacedName.Name,
+ Namespace: testNamespacedName.Namespace,
},
Spec: v1alpha1.ShardingSphereChaosSpec{
EmbedChaos: v1alpha1.EmbedChaos{
PodChaos:
&v1alpha1.PodChaosSpec{
PodSelector:
v1alpha1.PodSelector{
- Namespaces:
[]string{"mesh-test"},
LabelSelectors:
map[string]string{
-
"app.kubernetes.io/component": "zookeeper-new",
+
"app.kubernetes.io/component": "zookeeper",
},
},
Action:
v1alpha1.PodFailure,
Params:
v1alpha1.PodChaosParams{
PodFailure:
&v1alpha1.PodFailureParams{
-
Duration: &d,
+
Duration: &duration,
},
},
},
},
+ PressureCfg: v1alpha1.PressureCfg{
+ SsHost: "127.0.0.1:3306/ds_1",
+ Duration:
metav1.Duration{Duration: 10 * time.Second},
+ ReqTime:
metav1.Duration{Duration: 5 * time.Second},
+ DistSQLs: []v1alpha1.DistSQL{
+ {
+ SQL: "REGISTER
STORAGE UNIT ?()",
+ Args:
[]string{"ds_1"},
+ },
+ },
+ ConcurrentNum: 2,
+ ReqNum: 5,
+ },
},
+ Status: v1alpha1.ShardingSphereChaosStatus{},
}
- Expect(k8sClient.Create(ctx, ssChaos)).To(BeNil())
- })
- AfterEach(func() {
- Expect(k8sClient.Delete(ctx, ssChaos)).To(BeNil())
- })
+ Expect(k8sClient.Create(ctx, ssChaos)).Should(Succeed())
+
+ Eventually(func() compare {
+ var chaos v1alpha1.ShardingSphereChaos
+ Expect(k8sClient.Get(ctx, testNamespacedName,
&chaos)).Should(Succeed())
+ now := compare{
+ phase: chaos.Status.Phase,
+ conditionStatus:
make([]metav1.ConditionStatus, 0),
+ }
+ for i := range chaos.Status.Conditions {
+ now.conditionStatus =
append(now.conditionStatus, chaos.Status.Conditions[i].Status)
+ }
+
+ return now
+ }, 25*time.Second,
1*time.Second).Should(Equal(desireStatus))
- It("should create configmap", func() {
- configmap := &corev1.ConfigMap{}
- namespacedName := types.NamespacedName{Name: name,
Namespace: namespace}
- Eventually(func() bool {
- err := k8sClient.Get(ctx, namespacedName,
configmap)
- return err == nil
- }, time.Second*10,
time.Millisecond*250).Should(BeTrue())
+ Expect(k8sClient.Delete(ctx, ssChaos)).Should(Succeed())
})
})
})
-*/