This is an automated email from the ASF dual-hosted git repository.

zhangke pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-infra-e2e.git


The following commit(s) were added to refs/heads/main by this push:
     new 25b6c78  Implement the run and wait part of setup (#13)
25b6c78 is described below

commit 25b6c78a540734480c2b0f8b5e4a22201646da68
Author: Humbert Zhang <[email protected]>
AuthorDate: Sat Mar 13 16:33:20 2021 +0800

    Implement the run and wait part of setup (#13)
    
    * Implement the run and wait part of setup
---
 examples/simple/e2e.yaml            |  16 ++++++
 internal/components/setup/common.go | 108 ++++++++++++++++++++++++++++++++++++
 internal/components/setup/kind.go   | 106 ++++++++++++++++++++++-------------
 internal/config/e2eConfig.go        |   5 +-
 internal/config/globalConfig.go     |   2 +-
 5 files changed, 194 insertions(+), 43 deletions(-)

diff --git a/examples/simple/e2e.yaml b/examples/simple/e2e.yaml
index 480099b..778718e 100644
--- a/examples/simple/e2e.yaml
+++ b/examples/simple/e2e.yaml
@@ -33,6 +33,22 @@ setup:
         - namespace: default
           resource: pod
           for: condition=Ready
+  runs: # commands are serial within one code block and parallel between code 
blocks
+    - command: | # it can be a shell script or anything executable
+        cp $TMPDIR/e2e-k8s.config ~/.kube/config
+        kubectl create deployment nginx1 --image=nginx
+      wait:
+        - namespace: default
+          resource: deployment/nginx1
+          for: condition=Available
+    - command: |
+        cp $TMPDIR/e2e-k8s.config ~/.kube/config
+        kubectl create deployment nginx2 \
+                                  --image=nginx
+      wait:
+        - namespace: default
+          resource: deployment/nginx2
+          for: condition=Available
   timeout: 600
 
 verify:
diff --git a/internal/components/setup/common.go 
b/internal/components/setup/common.go
new file mode 100644
index 0000000..cae40c4
--- /dev/null
+++ b/internal/components/setup/common.go
@@ -0,0 +1,108 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package setup
+
+import (
+       "fmt"
+       "io/ioutil"
+       "time"
+
+       "github.com/apache/skywalking-infra-e2e/internal/config"
+       "github.com/apache/skywalking-infra-e2e/internal/logger"
+       "github.com/apache/skywalking-infra-e2e/internal/util"
+)
+
+// RunCommandsAndWait Concurrently run commands and wait for conditions.
+func RunCommandsAndWait(runs []config.Run, timeout time.Duration) error {
+       waitSet := util.NewWaitSet(timeout)
+
+       for idx := range runs {
+               run := runs[idx]
+               commands := run.Command
+               if len(commands) < 1 {
+                       continue
+               }
+
+               waitSet.WaitGroup.Add(1)
+               go executeCommandsAndWait(commands, run.Waits, waitSet)
+       }
+
+       go func() {
+               waitSet.WaitGroup.Wait()
+               close(waitSet.FinishChan)
+       }()
+
+       select {
+       case <-waitSet.FinishChan:
+               logger.Log.Infof("all commands executed successfully")
+       case err := <-waitSet.ErrChan:
+               logger.Log.Errorf("execute command error")
+               return err
+       case <-time.After(waitSet.Timeout):
+               return fmt.Errorf("wait for commands run timeout after %d 
seconds", int(timeout.Seconds()))
+       }
+
+       return nil
+}
+
+func executeCommandsAndWait(commands string, waits []config.Wait, waitSet 
*util.WaitSet) {
+       defer waitSet.WaitGroup.Done()
+
+       // executes commands
+       logger.Log.Infof("executing commands [%s]", commands)
+       result, err := util.ExecuteCommand(commands)
+       if err != nil {
+               err = fmt.Errorf("commands: [%s] runs error: %s", commands, err)
+               waitSet.ErrChan <- err
+       }
+       logger.Log.Infof("executed commands [%s], result: %s", commands, result)
+
+       // waits for conditions meet
+       for idx := range waits {
+               wait := waits[idx]
+               logger.Log.Infof("waiting for %+v", wait)
+
+               kubeConfigYaml, err := ioutil.ReadFile(kubeConfigPath)
+               if err != nil {
+                       err = fmt.Errorf("read kube config failed: %s", err)
+                       waitSet.ErrChan <- err
+               }
+
+               options, err := getWaitOptions(kubeConfigYaml, &wait)
+               if err != nil {
+                       err = fmt.Errorf("commands: [%s] get wait options 
error: %s", commands, err)
+                       waitSet.ErrChan <- err
+               }
+
+               err = options.RunWait()
+               if err != nil {
+                       err = fmt.Errorf("commands: [%s] waits error: %s", 
commands, err)
+                       waitSet.ErrChan <- err
+                       return
+               }
+               logger.Log.Infof("wait %+v condition met", wait)
+       }
+}
+
+// NewTimeout calculates new timeout since timeBefore.
+func NewTimeout(timeBefore time.Time, timeout time.Duration) time.Duration {
+       elapsed := time.Since(timeBefore)
+       newTimeout := timeout - elapsed
+       return newTimeout
+}
diff --git a/internal/components/setup/kind.go 
b/internal/components/setup/kind.go
index 2b6c100..c4c4f27 100644
--- a/internal/components/setup/kind.go
+++ b/internal/components/setup/kind.go
@@ -65,6 +65,16 @@ func KindSetup(e2eConfig *config.E2EConfig) error {
                return nil
        }
 
+       if err := createKindCluster(kindConfigPath); err != nil {
+               return err
+       }
+
+       c, dc, err := util.ConnectToK8sCluster(kubeConfigPath)
+       if err != nil {
+               logger.Log.Errorf("connect to k8s cluster failed according to 
config file: %s", kubeConfigPath)
+               return err
+       }
+
        timeout := e2eConfig.Setup.Timeout
        var waitTimeout time.Duration
        if timeout <= 0 {
@@ -75,19 +85,26 @@ func KindSetup(e2eConfig *config.E2EConfig) error {
 
        logger.Log.Debugf("wait timeout is %d seconds", 
int(waitTimeout.Seconds()))
 
-       if err := createKindCluster(kindConfigPath); err != nil {
-               return err
-       }
+       // record time now
+       timeNow := time.Now()
 
-       c, dc, err := util.ConnectToK8sCluster(kubeConfigPath)
+       err = createManifestsAndWait(c, dc, manifests, waitTimeout)
        if err != nil {
-               logger.Log.Errorf("connect to k8s cluster failed according to 
config file: %s", kubeConfigPath)
                return err
        }
 
-       err = createManifestsAndWait(c, dc, manifests, waitTimeout)
-       if err != nil {
-               return err
+       // calculates new timeout. manifests and run of setup uses the same 
countdown.
+       runWaitTimeout := NewTimeout(timeNow, waitTimeout)
+       if runWaitTimeout <= 0 {
+               return fmt.Errorf("kind setup timeout")
+       }
+
+       if len(e2eConfig.Setup.Runs) > 0 {
+               logger.Log.Debugf("executing commands...")
+               err := RunCommandsAndWait(e2eConfig.Setup.Runs, runWaitTimeout)
+               if err != nil {
+                       return err
+               }
        }
        return nil
 }
@@ -129,42 +146,17 @@ func createManifestsAndWait(c *kubernetes.Clientset, dc 
dynamic.Interface, manif
                        continue
                }
 
-               for _, wait := range waits {
-                       if strings.Contains(wait.Resource, "/") && 
wait.LabelSelector != "" {
-                               return fmt.Errorf("when passing 
resource.group/resource.name in Resource, the labelSelector can not be set at 
the same time")
-                       }
-
-                       restClientGetter := 
util.NewSimpleRESTClientGetter(wait.Namespace, string(kubeConfigYaml))
-                       silenceOutput, _ := os.Open(os.DevNull)
-                       ioStreams := genericclioptions.IOStreams{In: os.Stdin, 
Out: silenceOutput, ErrOut: os.Stderr}
-                       waitFlags := ctlwait.NewWaitFlags(restClientGetter, 
ioStreams)
-                       // global timeout is set in e2e.yaml
-                       waitFlags.Timeout = constant.SingleDefaultWaitTimeout
-                       waitFlags.ForCondition = wait.For
-
-                       var args []string
-                       // resource.group/resource.name OR resource.group
-                       if wait.Resource != "" {
-                               args = append(args, wait.Resource)
-                       } else {
-                               return fmt.Errorf("resource must be provided in 
wait block")
-                       }
-
-                       if wait.LabelSelector != "" {
-                               waitFlags.ResourceBuilderFlags.LabelSelector = 
&wait.LabelSelector
-                       } else if !strings.Contains(wait.Resource, "/") {
-                               // if labelSelector is nil and resource only 
provide resource.group, check all resources.
-                               waitFlags.ResourceBuilderFlags.All = 
&constant.True
-                       }
+               for idx := range waits {
+                       wait := waits[idx]
+                       logger.Log.Infof("waiting for %+v", wait)
 
-                       options, err := waitFlags.ToOptions(args)
+                       options, err := getWaitOptions(kubeConfigYaml, &wait)
                        if err != nil {
                                return err
                        }
 
-                       logger.Log.Infof("waiting for %+v", wait)
                        waitSet.WaitGroup.Add(1)
-                       go concurrentlyWait(wait, options, waitSet)
+                       go concurrentlyWait(&wait, options, waitSet)
                }
        }
 
@@ -186,6 +178,41 @@ func createManifestsAndWait(c *kubernetes.Clientset, dc 
dynamic.Interface, manif
        return nil
 }
 
+func getWaitOptions(kubeConfigYaml []byte, wait *config.Wait) (options 
*ctlwait.WaitOptions, err error) {
+       if strings.Contains(wait.Resource, "/") && wait.LabelSelector != "" {
+               return nil, fmt.Errorf("when passing 
resource.group/resource.name in Resource, the labelSelector can not be set at 
the same time")
+       }
+
+       restClientGetter := util.NewSimpleRESTClientGetter(wait.Namespace, 
string(kubeConfigYaml))
+       silenceOutput, _ := os.Open(os.DevNull)
+       ioStreams := genericclioptions.IOStreams{In: os.Stdin, Out: 
silenceOutput, ErrOut: os.Stderr}
+       waitFlags := ctlwait.NewWaitFlags(restClientGetter, ioStreams)
+       // global timeout is set in e2e.yaml
+       waitFlags.Timeout = constant.SingleDefaultWaitTimeout
+       waitFlags.ForCondition = wait.For
+
+       var args []string
+       // resource.group/resource.name OR resource.group
+       if wait.Resource != "" {
+               args = append(args, wait.Resource)
+       } else {
+               return nil, fmt.Errorf("resource must be provided in wait 
block")
+       }
+
+       if wait.LabelSelector != "" {
+               waitFlags.ResourceBuilderFlags.LabelSelector = 
&wait.LabelSelector
+       } else if !strings.Contains(wait.Resource, "/") {
+               // if labelSelector is nil and resource only provide 
resource.group, check all resources.
+               waitFlags.ResourceBuilderFlags.All = &constant.True
+       }
+
+       options, err = waitFlags.ToOptions(args)
+       if err != nil {
+               return nil, err
+       }
+       return options, nil
+}
+
 func createByManifest(c *kubernetes.Clientset, dc dynamic.Interface, manifest 
config.Manifest) error {
        files, err := util.GetManifests(manifest.GetPath())
        if err != nil {
@@ -204,13 +231,14 @@ func createByManifest(c *kubernetes.Clientset, dc 
dynamic.Interface, manifest co
        return nil
 }
 
-func concurrentlyWait(wait config.Wait, options *ctlwait.WaitOptions, waitSet 
*util.WaitSet) {
+func concurrentlyWait(wait *config.Wait, options *ctlwait.WaitOptions, waitSet 
*util.WaitSet) {
        defer waitSet.WaitGroup.Done()
 
        err := options.RunWait()
        if err != nil {
                err = fmt.Errorf("wait strategy :%+v, err: %s", wait, err)
                waitSet.ErrChan <- err
+               return
        }
        logger.Log.Infof("wait %+v condition met", wait)
 }
diff --git a/internal/config/e2eConfig.go b/internal/config/e2eConfig.go
index 7b54aaf..9ccbe98 100644
--- a/internal/config/e2eConfig.go
+++ b/internal/config/e2eConfig.go
@@ -30,9 +30,8 @@ type Setup struct {
        Env       string     `yaml:"env"`
        File      string     `yaml:"file"`
        Manifests []Manifest `yaml:"manifests"`
-       // Run is not supported yet
-       Run     []Run `yaml:"run"`
-       Timeout int   `yaml:"timeout"`
+       Runs      []Run      `yaml:"runs"`
+       Timeout   int        `yaml:"timeout"`
 }
 
 func (s *Setup) GetFile() string {
diff --git a/internal/config/globalConfig.go b/internal/config/globalConfig.go
index fb72c9c..7cef38e 100644
--- a/internal/config/globalConfig.go
+++ b/internal/config/globalConfig.go
@@ -28,7 +28,7 @@ import (
        "gopkg.in/yaml.v2"
 )
 
-// GlobalE2EConfig store E2EConfig which can be used globally.
+// GlobalE2EConfig stores E2EConfig which can be used globally.
 type GlobalE2EConfig struct {
        Error     error
        E2EConfig E2EConfig

Reply via email to