This is an automated email from the ASF dual-hosted git repository.

nferraro pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit 885d2bd99ca5ce46bad901df5082d6e424a1500b
Author: nicolaferraro <ni.ferr...@gmail.com>
AuthorDate: Mon Jan 10 14:43:43 2022 +0100

    Fix #1107: disable applier code to detect real CI errors
---
 pkg/install/kamelets.go    |  94 +++++++++++++++++++++++++++++++++++++--
 pkg/resources/resources.go |  12 ++---
 pkg/trait/deployer.go      | 108 ++++++++++++++++++++++++++++++++++++++++++++-
 3 files changed, 203 insertions(+), 11 deletions(-)

diff --git a/pkg/install/kamelets.go b/pkg/install/kamelets.go
index 4ff4572..82a818b 100644
--- a/pkg/install/kamelets.go
+++ b/pkg/install/kamelets.go
@@ -19,21 +19,33 @@ package install
 
 import (
        "context"
+       "errors"
        "fmt"
        "io/fs"
+       "net/http"
        "os"
        "path"
        "path/filepath"
        "strings"
+       "sync"
+       "sync/atomic"
 
        "golang.org/x/sync/errgroup"
 
+       k8serrors "k8s.io/apimachinery/pkg/api/errors"
+       "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
+       "k8s.io/apimachinery/pkg/runtime"
+       "k8s.io/apimachinery/pkg/types"
+
+       ctrl "sigs.k8s.io/controller-runtime/pkg/client"
+       logf "sigs.k8s.io/controller-runtime/pkg/log"
+
        "github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
        "github.com/apache/camel-k/pkg/client"
        "github.com/apache/camel-k/pkg/util"
        "github.com/apache/camel-k/pkg/util/defaults"
        "github.com/apache/camel-k/pkg/util/kubernetes"
-       "k8s.io/apimachinery/pkg/runtime"
+       "github.com/apache/camel-k/pkg/util/patch"
 )
 
 const (
@@ -41,6 +53,13 @@ const (
        defaultKameletDir = "/kamelets/"
 )
 
+var (
+       log = logf.Log
+
+       hasServerSideApply atomic.Value
+       tryServerSideApply sync.Once
+)
+
 // KameletCatalog installs the bundled Kamelets into the specified namespace.
 func KameletCatalog(ctx context.Context, c client.Client, namespace string) 
error {
        kameletDir := os.Getenv(kameletDirEnv)
@@ -58,7 +77,7 @@ func KameletCatalog(ctx context.Context, c client.Client, 
namespace string) erro
        }
 
        g, gCtx := errgroup.WithContext(ctx)
-       applier := c.ServerOrClientSideApplier()
+
        err = filepath.WalkDir(kameletDir, func(p string, f fs.DirEntry, err 
error) error {
                if err != nil {
                        return err
@@ -75,9 +94,31 @@ func KameletCatalog(ctx context.Context, c client.Client, 
namespace string) erro
                        if err != nil {
                                return err
                        }
-                       if err := applier.Apply(gCtx, kamelet); err != nil {
+                       once := false
+                       tryServerSideApply.Do(func() {
+                               once = true
+                               if err = serverSideApply(gCtx, c, kamelet); err 
!= nil {
+                                       if isIncompatibleServerError(err) {
+                                               log.Info("Fallback to 
client-side apply for installing bundled Kamelets")
+                                               hasServerSideApply.Store(false)
+                                               err = nil
+                                       } else {
+                                               tryServerSideApply = sync.Once{}
+                                       }
+                               } else {
+                                       hasServerSideApply.Store(true)
+                               }
+                       })
+                       if err != nil {
                                return err
                        }
+                       if v := hasServerSideApply.Load(); v.(bool) {
+                               if !once {
+                                       return serverSideApply(gCtx, c, kamelet)
+                               }
+                       } else {
+                               return clientSideApply(gCtx, c, kamelet)
+                       }
                        return nil
                })
                return nil
@@ -89,6 +130,53 @@ func KameletCatalog(ctx context.Context, c client.Client, 
namespace string) erro
        return g.Wait()
 }
 
+func serverSideApply(ctx context.Context, c client.Client, resource 
runtime.Object) error {
+       target, err := patch.PositiveApplyPatch(resource)
+       if err != nil {
+               return err
+       }
+       return c.Patch(ctx, target, ctrl.Apply, ctrl.ForceOwnership, 
ctrl.FieldOwner("camel-k-operator"))
+}
+
+func clientSideApply(ctx context.Context, c client.Client, resource 
ctrl.Object) error {
+       err := c.Create(ctx, resource)
+       if err == nil {
+               return nil
+       } else if !k8serrors.IsAlreadyExists(err) {
+               return fmt.Errorf("error during create resource: %s/%s: %w", 
resource.GetNamespace(), resource.GetName(), err)
+       }
+       object := &unstructured.Unstructured{}
+       object.SetNamespace(resource.GetNamespace())
+       object.SetName(resource.GetName())
+       object.SetGroupVersionKind(resource.GetObjectKind().GroupVersionKind())
+       err = c.Get(ctx, ctrl.ObjectKeyFromObject(object), object)
+       if err != nil {
+               return err
+       }
+       p, err := patch.PositiveMergePatch(object, resource)
+       if err != nil {
+               return err
+       } else if len(p) == 0 {
+               return nil
+       }
+       return c.Patch(ctx, resource, ctrl.RawPatch(types.MergePatchType, p))
+}
+
+func isIncompatibleServerError(err error) bool {
+       // First simpler check for older servers (i.e. OpenShift 3.11)
+       if strings.Contains(err.Error(), "415: Unsupported Media Type") {
+               return true
+       }
+       // 415: Unsupported media type means we're talking to a server which 
doesn't
+       // support server-side apply.
+       var serr *k8serrors.StatusError
+       if errors.As(err, &serr) {
+               return serr.Status().Code == http.StatusUnsupportedMediaType
+       }
+       // Non-StatusError means the error isn't because the server is 
incompatible.
+       return false
+}
+
 func loadKamelet(path string, namespace string, scheme *runtime.Scheme) 
(*v1alpha1.Kamelet, error) {
        content, err := util.ReadFile(path)
        if err != nil {
diff --git a/pkg/resources/resources.go b/pkg/resources/resources.go
index e64bea2..bcc7095 100644
--- a/pkg/resources/resources.go
+++ b/pkg/resources/resources.go
@@ -145,16 +145,16 @@ var assets = func() http.FileSystem {
                "/crd/bases/camel.apache.org_integrations.yaml": 
&vfsgen۰CompressedFileInfo{
                        name:             "camel.apache.org_integrations.yaml",
                        modTime:          time.Time{},
-                       uncompressedSize: 366985,
+                       uncompressedSize: 367530,
 
-                       compressedContent: 
[]byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xec\xbd\xfb\x73\x1b\x37\x96\x30\xfa\x7b\xfe\x8a\x53\x4e\xea\x93\xb4\x11\x29\x3b\x99\x9d\xbb\xe3\x3b\xf5\xa5\x34\x92\x9c\xd5\x8d\x2d\xab\x2c\x25\xf9\x52\x4e\x36\x0b\x76\x83\x24\x56\xdd\x40\x2f\x80\xa6\xcc\xbd\xbe\xff\xfb\x2d\x1c\x00\xfd\xe0\xab\x81\x16\xe9\x38\x53\x8d\xa9\x9a\x98\x14\xfb\x34\x1e\xe7\x7d\x0e\xce\xf9\x12\x46\xfb\x1b\x5f\x7c\x09\xaf\x59\x42\xb9\xa2\x29\x68\x01\x7a\x4e\xe1\xbc\x20\xc9\x9c\xc2\x9d\x98\xea\x
 [...]
+                       compressedContent: 
[]byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xec\xbd\xfb\x73\x1b\x37\x96\x30\xfa\x7b\xfe\x8a\x53\x4e\xea\x93\xb4\x11\x29\x3b\x99\x9d\xbb\xe3\x3b\xf5\xa5\x34\x92\x9c\xd5\x8d\x2d\xab\x2c\x25\xf9\x52\x4e\x36\x0b\x76\x83\x24\x56\xdd\x40\x2f\x80\xa6\xcc\xbd\xbe\xff\xfb\x2d\x1c\x00\xfd\xe0\xab\x81\x16\xe9\x38\x53\x8d\xa9\x9a\x98\x14\xfb\x34\x1e\xe7\x7d\x0e\xce\xf9\x12\x46\xfb\x1b\x5f\x7c\x09\xaf\x59\x42\xb9\xa2\x29\x68\x01\x7a\x4e\xe1\xbc\x20\xc9\x9c\xc2\x9d\x98\xea\x
 [...]
                },
                "/crd/bases/camel.apache.org_kameletbindings.yaml": 
&vfsgen۰CompressedFileInfo{
                        name:             
"camel.apache.org_kameletbindings.yaml",
                        modTime:          time.Time{},
-                       uncompressedSize: 432125,
+                       uncompressedSize: 432720,
 
-                       compressedContent: 
[]byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xec\xbd\xfb\x73\x1b\x37\x96\x30\xfa\x7b\xfe\x0a\x94\x9c\xfa\x24\x6d\x44\xca\xce\xcc\xce\xdd\xf1\x9d\xfa\x52\x1a\x59\xce\xe8\xc6\x96\x59\x96\xe2\x7c\x29\x27\x9b\x05\xbb\x41\x12\xab\x6e\xa0\x17\x40\x53\xe2\x5e\xdf\xff\xfd\x16\x0e\x80\x7e\xf0\x25\x9c\xa6\xa8\x28\x3b\x8d\xa9\x9a\x98\x22\xfb\x34\x5e\xe7\xfd\x7a\x41\x06\x8f\x37\xbe\x7a\x41\xde\xf1\x84\x09\xcd\x52\x62\x24\x31\x33\x46\xce\x0a\x9a\xcc\x18\xb9\x96\x13\x73\x47\x
 [...]
+                       compressedContent: 
[]byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xec\xbd\xfb\x73\x1b\x37\x96\x30\xfa\x7b\xfe\x0a\x94\x9c\xfa\x24\x6d\x44\xca\xce\xcc\xce\xdd\xf1\x9d\xfa\x52\x1a\x59\xce\xe8\xc6\x96\x59\x96\xe2\x7c\x29\x27\x9b\x05\xbb\x41\x12\xab\x6e\xa0\x17\x40\x53\xe2\x5e\xdf\xff\xfd\x16\x0e\x80\x7e\xf0\x25\x9c\xa6\xa8\x28\x3b\x8d\xa9\x9a\x98\x22\xfb\x34\x5e\xe7\xfd\x7a\x41\x06\x8f\x37\xbe\x7a\x41\xde\xf1\x84\x09\xcd\x52\x62\x24\x31\x33\x46\xce\x0a\x9a\xcc\x18\xb9\x96\x13\x73\x47\x
 [...]
                },
                "/crd/bases/camel.apache.org_kamelets.yaml": 
&vfsgen۰CompressedFileInfo{
                        name:             "camel.apache.org_kamelets.yaml",
@@ -555,9 +555,9 @@ var assets = func() http.FileSystem {
                "/traits.yaml": &vfsgen۰CompressedFileInfo{
                        name:             "traits.yaml",
                        modTime:          time.Time{},
-                       uncompressedSize: 49341,
+                       uncompressedSize: 50652,
 
-                       compressedContent: 
[]byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xec\x7d\xfd\x73\x5b\xb9\x91\xe0\xef\xf3\x57\xa0\xb4\x57\x65\x49\x45\x52\x9e\xc9\x26\x3b\xa7\xbb\xd9\x94\xc6\x76\x12\xcd\xf8\x43\x67\x3b\xb3\x97\x9a\x9b\x0a\xc1\xf7\x9a\x24\xcc\x47\xe0\x05\xc0\x93\xcc\xdc\xde\xff\x7e\x85\xee\xc6\xc7\x7b\x24\x25\xca\xb6\x66\xa3\xad\xdd\x54\xed\x58\xd2\x03\xd0\x68\x34\xfa\xbb\x1b\xde\x4a\xe5\xdd\xf9\x57\x63\xa1\xe5\x1a\xce\x85\x9c\xcf\x95\x56\x7e\xf3\x95\x10\x6d\x23\xfd\xdc\xd8\xf5\xb9\x
 [...]
+                       compressedContent: 
[]byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xec\xbd\x7d\x73\x1c\xb9\x91\x27\xfc\xff\x7c\x0a\x04\xfd\x44\x88\x64\x74\x37\x35\xe3\xb5\x3d\x0f\xef\xb4\x3e\x8e\x24\xdb\x9c\xd1\x0b\x4f\x92\xc7\xe7\xd0\x29\xdc\xe8\xaa\xec\x6e\xa8\xab\x81\x32\x80\x22\xd5\x3e\xdf\x77\xbf\x40\x66\xe2\xa5\xaa\x9b\x64\x53\x12\x67\xcd\x8d\x5d\x47\xec\x88\x64\x01\x48\x24\x12\x89\x44\xe6\x2f\x13\xde\x4a\xe5\xdd\xe9\x37\x63\xa1\xe5\x1a\x4e\x85\x9c\xcf\x95\x56\x7e\xf3\x8d\x10\x6d\x23\xfd\xdc\x
 [...]
                },
        }
        fs["/"].(*vfsgen۰DirInfo).entries = []os.FileInfo{
diff --git a/pkg/trait/deployer.go b/pkg/trait/deployer.go
index 7735a37..67cdb79 100644
--- a/pkg/trait/deployer.go
+++ b/pkg/trait/deployer.go
@@ -17,6 +17,22 @@ limitations under the License.
 
 package trait
 
+import (
+       "encoding/json"
+       "errors"
+       "fmt"
+       "net/http"
+       "strings"
+
+       k8serrors "k8s.io/apimachinery/pkg/api/errors"
+       "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
+       "k8s.io/apimachinery/pkg/types"
+
+       ctrl "sigs.k8s.io/controller-runtime/pkg/client"
+
+       "github.com/apache/camel-k/pkg/util/patch"
+)
+
 // The deployer trait is responsible for deploying the resources owned by the 
integration, and can be used
 // to explicitly select the underlying controller that will manage the 
integration pods.
 //
@@ -29,6 +45,8 @@ type deployerTrait struct {
 
 var _ ControllerStrategySelector = &deployerTrait{}
 
+var hasServerSideApply = true
+
 func newDeployerTrait() Trait {
        return &deployerTrait{
                BaseTrait: NewBaseTrait("deployer", 900),
@@ -42,9 +60,28 @@ func (t *deployerTrait) Configure(e *Environment) (bool, 
error) {
 func (t *deployerTrait) Apply(e *Environment) error {
        // Register a post action that patches the resources generated by the 
traits
        e.PostActions = append(e.PostActions, func(env *Environment) error {
-               applier := e.Client.ServerOrClientSideApplier()
                for _, resource := range env.Resources.Items() {
-                       if err := applier.Apply(e.Ctx, resource); err != nil {
+                       // We assume that server-side apply is enabled by 
default.
+                       // It is currently convoluted to check pro-actively 
whether server-side apply
+                       // is enabled. This is possible to fetch the OpenAPI 
endpoint, which returns
+                       // the entire server API document, then lookup the 
resource PATCH endpoint, and
+                       // check its list of accepted MIME types.
+                       // As a simpler solution, we fall back to client-side 
apply at the first
+                       // 415 error, and assume server-side apply is not 
available globally.
+                       if hasServerSideApply {
+                               err := t.serverSideApply(env, resource)
+                               switch {
+                               case err == nil:
+                                       continue
+                               case isIncompatibleServerError(err):
+                                       t.L.Info("Fallback to client-side apply 
to patch resources")
+                                       hasServerSideApply = false
+                               default:
+                                       // Keep server-side apply unless server 
is incompatible with it
+                                       return err
+                               }
+                       }
+                       if err := t.clientSideApply(env, resource); err != nil {
                                return err
                        }
                }
@@ -54,6 +91,73 @@ func (t *deployerTrait) Apply(e *Environment) error {
        return nil
 }
 
+func (t *deployerTrait) serverSideApply(env *Environment, resource 
ctrl.Object) error {
+       target, err := patch.PositiveApplyPatch(resource)
+       if err != nil {
+               return err
+       }
+       err = env.Client.Patch(env.Ctx, target, ctrl.Apply, 
ctrl.ForceOwnership, ctrl.FieldOwner("camel-k-operator"))
+       if err != nil {
+               return fmt.Errorf("error during apply resource: %s/%s: %w", 
resource.GetNamespace(), resource.GetName(), err)
+       }
+       // Update the resource with the response returned from the API server
+       return t.unstructuredToRuntimeObject(target, resource)
+}
+
+func (t *deployerTrait) clientSideApply(env *Environment, resource 
ctrl.Object) error {
+       err := env.Client.Create(env.Ctx, resource)
+       if err == nil {
+               return nil
+       } else if !k8serrors.IsAlreadyExists(err) {
+               return fmt.Errorf("error during create resource: %s/%s: %w", 
resource.GetNamespace(), resource.GetName(), err)
+       }
+       object := &unstructured.Unstructured{}
+       object.SetNamespace(resource.GetNamespace())
+       object.SetName(resource.GetName())
+       object.SetGroupVersionKind(resource.GetObjectKind().GroupVersionKind())
+       err = env.Client.Get(env.Ctx, ctrl.ObjectKeyFromObject(object), object)
+       if err != nil {
+               return err
+       }
+       p, err := patch.PositiveMergePatch(object, resource)
+       if err != nil {
+               return err
+       } else if len(p) == 0 {
+               // Update the resource with the object returned from the API 
server
+               return t.unstructuredToRuntimeObject(object, resource)
+       }
+       err = env.Client.Patch(env.Ctx, resource, 
ctrl.RawPatch(types.MergePatchType, p))
+       if err != nil {
+               return fmt.Errorf("error during patch %s/%s: %w", 
resource.GetNamespace(), resource.GetName(), err)
+       }
+       return nil
+}
+
+func (t *deployerTrait) unstructuredToRuntimeObject(u 
*unstructured.Unstructured, obj ctrl.Object) error {
+       data, err := json.Marshal(u)
+       if err != nil {
+               return err
+       }
+       return json.Unmarshal(data, obj)
+}
+
+func isIncompatibleServerError(err error) bool {
+       // First simpler check for older servers (i.e. OpenShift 3.11)
+       if strings.Contains(err.Error(), "415: Unsupported Media Type") {
+               return true
+       }
+
+       // 415: Unsupported media type means we're talking to a server which 
doesn't
+       // support server-side apply.
+       var serr *k8serrors.StatusError
+       if errors.As(err, &serr) {
+               return serr.Status().Code == http.StatusUnsupportedMediaType
+       }
+
+       // Non-StatusError means the error isn't because the server is 
incompatible.
+       return false
+}
+
 func (t *deployerTrait) SelectControllerStrategy(e *Environment) 
(*ControllerStrategy, error) {
        if IsFalse(t.Enabled) {
                return nil, nil

Reply via email to