This is an automated email from the ASF dual-hosted git repository. nferraro pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit 885d2bd99ca5ce46bad901df5082d6e424a1500b Author: nicolaferraro <ni.ferr...@gmail.com> AuthorDate: Mon Jan 10 14:43:43 2022 +0100 Fix #1107: disable applier code to detect real CI errors --- pkg/install/kamelets.go | 94 +++++++++++++++++++++++++++++++++++++-- pkg/resources/resources.go | 12 ++--- pkg/trait/deployer.go | 108 ++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 203 insertions(+), 11 deletions(-) diff --git a/pkg/install/kamelets.go b/pkg/install/kamelets.go index 4ff4572..82a818b 100644 --- a/pkg/install/kamelets.go +++ b/pkg/install/kamelets.go @@ -19,21 +19,33 @@ package install import ( "context" + "errors" "fmt" "io/fs" + "net/http" "os" "path" "path/filepath" "strings" + "sync" + "sync/atomic" "golang.org/x/sync/errgroup" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + + ctrl "sigs.k8s.io/controller-runtime/pkg/client" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "github.com/apache/camel-k/pkg/apis/camel/v1alpha1" "github.com/apache/camel-k/pkg/client" "github.com/apache/camel-k/pkg/util" "github.com/apache/camel-k/pkg/util/defaults" "github.com/apache/camel-k/pkg/util/kubernetes" - "k8s.io/apimachinery/pkg/runtime" + "github.com/apache/camel-k/pkg/util/patch" ) const ( @@ -41,6 +53,13 @@ const ( defaultKameletDir = "/kamelets/" ) +var ( + log = logf.Log + + hasServerSideApply atomic.Value + tryServerSideApply sync.Once +) + // KameletCatalog installs the bundled Kamelets into the specified namespace. func KameletCatalog(ctx context.Context, c client.Client, namespace string) error { kameletDir := os.Getenv(kameletDirEnv) @@ -58,7 +77,7 @@ func KameletCatalog(ctx context.Context, c client.Client, namespace string) erro } g, gCtx := errgroup.WithContext(ctx) - applier := c.ServerOrClientSideApplier() + err = filepath.WalkDir(kameletDir, func(p string, f fs.DirEntry, err error) error { if err != nil { return err @@ -75,9 +94,31 @@ func KameletCatalog(ctx context.Context, c client.Client, namespace string) erro if err != nil { return err } - if err := applier.Apply(gCtx, kamelet); err != nil { + once := false + tryServerSideApply.Do(func() { + once = true + if err = serverSideApply(gCtx, c, kamelet); err != nil { + if isIncompatibleServerError(err) { + log.Info("Fallback to client-side apply for installing bundled Kamelets") + hasServerSideApply.Store(false) + err = nil + } else { + tryServerSideApply = sync.Once{} + } + } else { + hasServerSideApply.Store(true) + } + }) + if err != nil { return err } + if v := hasServerSideApply.Load(); v.(bool) { + if !once { + return serverSideApply(gCtx, c, kamelet) + } + } else { + return clientSideApply(gCtx, c, kamelet) + } return nil }) return nil @@ -89,6 +130,53 @@ func KameletCatalog(ctx context.Context, c client.Client, namespace string) erro return g.Wait() } +func serverSideApply(ctx context.Context, c client.Client, resource runtime.Object) error { + target, err := patch.PositiveApplyPatch(resource) + if err != nil { + return err + } + return c.Patch(ctx, target, ctrl.Apply, ctrl.ForceOwnership, ctrl.FieldOwner("camel-k-operator")) +} + +func clientSideApply(ctx context.Context, c client.Client, resource ctrl.Object) error { + err := c.Create(ctx, resource) + if err == nil { + return nil + } else if !k8serrors.IsAlreadyExists(err) { + return fmt.Errorf("error during create resource: %s/%s: %w", resource.GetNamespace(), resource.GetName(), err) + } + object := &unstructured.Unstructured{} + object.SetNamespace(resource.GetNamespace()) + object.SetName(resource.GetName()) + object.SetGroupVersionKind(resource.GetObjectKind().GroupVersionKind()) + err = c.Get(ctx, ctrl.ObjectKeyFromObject(object), object) + if err != nil { + return err + } + p, err := patch.PositiveMergePatch(object, resource) + if err != nil { + return err + } else if len(p) == 0 { + return nil + } + return c.Patch(ctx, resource, ctrl.RawPatch(types.MergePatchType, p)) +} + +func isIncompatibleServerError(err error) bool { + // First simpler check for older servers (i.e. OpenShift 3.11) + if strings.Contains(err.Error(), "415: Unsupported Media Type") { + return true + } + // 415: Unsupported media type means we're talking to a server which doesn't + // support server-side apply. + var serr *k8serrors.StatusError + if errors.As(err, &serr) { + return serr.Status().Code == http.StatusUnsupportedMediaType + } + // Non-StatusError means the error isn't because the server is incompatible. + return false +} + func loadKamelet(path string, namespace string, scheme *runtime.Scheme) (*v1alpha1.Kamelet, error) { content, err := util.ReadFile(path) if err != nil { diff --git a/pkg/resources/resources.go b/pkg/resources/resources.go index e64bea2..bcc7095 100644 --- a/pkg/resources/resources.go +++ b/pkg/resources/resources.go @@ -145,16 +145,16 @@ var assets = func() http.FileSystem { "/crd/bases/camel.apache.org_integrations.yaml": &vfsgen۰CompressedFileInfo{ name: "camel.apache.org_integrations.yaml", modTime: time.Time{}, - uncompressedSize: 366985, + uncompressedSize: 367530, - compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xec\xbd\xfb\x73\x1b\x37\x96\x30\xfa\x7b\xfe\x8a\x53\x4e\xea\x93\xb4\x11\x29\x3b\x99\x9d\xbb\xe3\x3b\xf5\xa5\x34\x92\x9c\xd5\x8d\x2d\xab\x2c\x25\xf9\x52\x4e\x36\x0b\x76\x83\x24\x56\xdd\x40\x2f\x80\xa6\xcc\xbd\xbe\xff\xfb\x2d\x1c\x00\xfd\xe0\xab\x81\x16\xe9\x38\x53\x8d\xa9\x9a\x98\x14\xfb\x34\x1e\xe7\x7d\x0e\xce\xf9\x12\x46\xfb\x1b\x5f\x7c\x09\xaf\x59\x42\xb9\xa2\x29\x68\x01\x7a\x4e\xe1\xbc\x20\xc9\x9c\xc2\x9d\x98\xea\x [...] + compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xec\xbd\xfb\x73\x1b\x37\x96\x30\xfa\x7b\xfe\x8a\x53\x4e\xea\x93\xb4\x11\x29\x3b\x99\x9d\xbb\xe3\x3b\xf5\xa5\x34\x92\x9c\xd5\x8d\x2d\xab\x2c\x25\xf9\x52\x4e\x36\x0b\x76\x83\x24\x56\xdd\x40\x2f\x80\xa6\xcc\xbd\xbe\xff\xfb\x2d\x1c\x00\xfd\xe0\xab\x81\x16\xe9\x38\x53\x8d\xa9\x9a\x98\x14\xfb\x34\x1e\xe7\x7d\x0e\xce\xf9\x12\x46\xfb\x1b\x5f\x7c\x09\xaf\x59\x42\xb9\xa2\x29\x68\x01\x7a\x4e\xe1\xbc\x20\xc9\x9c\xc2\x9d\x98\xea\x [...] }, "/crd/bases/camel.apache.org_kameletbindings.yaml": &vfsgen۰CompressedFileInfo{ name: "camel.apache.org_kameletbindings.yaml", modTime: time.Time{}, - uncompressedSize: 432125, + uncompressedSize: 432720, - compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xec\xbd\xfb\x73\x1b\x37\x96\x30\xfa\x7b\xfe\x0a\x94\x9c\xfa\x24\x6d\x44\xca\xce\xcc\xce\xdd\xf1\x9d\xfa\x52\x1a\x59\xce\xe8\xc6\x96\x59\x96\xe2\x7c\x29\x27\x9b\x05\xbb\x41\x12\xab\x6e\xa0\x17\x40\x53\xe2\x5e\xdf\xff\xfd\x16\x0e\x80\x7e\xf0\x25\x9c\xa6\xa8\x28\x3b\x8d\xa9\x9a\x98\x22\xfb\x34\x5e\xe7\xfd\x7a\x41\x06\x8f\x37\xbe\x7a\x41\xde\xf1\x84\x09\xcd\x52\x62\x24\x31\x33\x46\xce\x0a\x9a\xcc\x18\xb9\x96\x13\x73\x47\x [...] + compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xec\xbd\xfb\x73\x1b\x37\x96\x30\xfa\x7b\xfe\x0a\x94\x9c\xfa\x24\x6d\x44\xca\xce\xcc\xce\xdd\xf1\x9d\xfa\x52\x1a\x59\xce\xe8\xc6\x96\x59\x96\xe2\x7c\x29\x27\x9b\x05\xbb\x41\x12\xab\x6e\xa0\x17\x40\x53\xe2\x5e\xdf\xff\xfd\x16\x0e\x80\x7e\xf0\x25\x9c\xa6\xa8\x28\x3b\x8d\xa9\x9a\x98\x22\xfb\x34\x5e\xe7\xfd\x7a\x41\x06\x8f\x37\xbe\x7a\x41\xde\xf1\x84\x09\xcd\x52\x62\x24\x31\x33\x46\xce\x0a\x9a\xcc\x18\xb9\x96\x13\x73\x47\x [...] }, "/crd/bases/camel.apache.org_kamelets.yaml": &vfsgen۰CompressedFileInfo{ name: "camel.apache.org_kamelets.yaml", @@ -555,9 +555,9 @@ var assets = func() http.FileSystem { "/traits.yaml": &vfsgen۰CompressedFileInfo{ name: "traits.yaml", modTime: time.Time{}, - uncompressedSize: 49341, + uncompressedSize: 50652, - compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xec\x7d\xfd\x73\x5b\xb9\x91\xe0\xef\xf3\x57\xa0\xb4\x57\x65\x49\x45\x52\x9e\xc9\x26\x3b\xa7\xbb\xd9\x94\xc6\x76\x12\xcd\xf8\x43\x67\x3b\xb3\x97\x9a\x9b\x0a\xc1\xf7\x9a\x24\xcc\x47\xe0\x05\xc0\x93\xcc\xdc\xde\xff\x7e\x85\xee\xc6\xc7\x7b\x24\x25\xca\xb6\x66\xa3\xad\xdd\x54\xed\x58\xd2\x03\xd0\x68\x34\xfa\xbb\x1b\xde\x4a\xe5\xdd\xf9\x57\x63\xa1\xe5\x1a\xce\x85\x9c\xcf\x95\x56\x7e\xf3\x95\x10\x6d\x23\xfd\xdc\xd8\xf5\xb9\x [...] + compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xec\xbd\x7d\x73\x1c\xb9\x91\x27\xfc\xff\x7c\x0a\x04\xfd\x44\x88\x64\x74\x37\x35\xe3\xb5\x3d\x0f\xef\xb4\x3e\x8e\x24\xdb\x9c\xd1\x0b\x4f\x92\xc7\xe7\xd0\x29\xdc\xe8\xaa\xec\x6e\xa8\xab\x81\x32\x80\x22\xd5\x3e\xdf\x77\xbf\x40\x66\xe2\xa5\xaa\x9b\x64\x53\x12\x67\xcd\x8d\x5d\x47\xec\x88\x64\x01\x48\x24\x12\x89\x44\xe6\x2f\x13\xde\x4a\xe5\xdd\xe9\x37\x63\xa1\xe5\x1a\x4e\x85\x9c\xcf\x95\x56\x7e\xf3\x8d\x10\x6d\x23\xfd\xdc\x [...] }, } fs["/"].(*vfsgen۰DirInfo).entries = []os.FileInfo{ diff --git a/pkg/trait/deployer.go b/pkg/trait/deployer.go index 7735a37..67cdb79 100644 --- a/pkg/trait/deployer.go +++ b/pkg/trait/deployer.go @@ -17,6 +17,22 @@ limitations under the License. package trait +import ( + "encoding/json" + "errors" + "fmt" + "net/http" + "strings" + + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/types" + + ctrl "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/apache/camel-k/pkg/util/patch" +) + // The deployer trait is responsible for deploying the resources owned by the integration, and can be used // to explicitly select the underlying controller that will manage the integration pods. // @@ -29,6 +45,8 @@ type deployerTrait struct { var _ ControllerStrategySelector = &deployerTrait{} +var hasServerSideApply = true + func newDeployerTrait() Trait { return &deployerTrait{ BaseTrait: NewBaseTrait("deployer", 900), @@ -42,9 +60,28 @@ func (t *deployerTrait) Configure(e *Environment) (bool, error) { func (t *deployerTrait) Apply(e *Environment) error { // Register a post action that patches the resources generated by the traits e.PostActions = append(e.PostActions, func(env *Environment) error { - applier := e.Client.ServerOrClientSideApplier() for _, resource := range env.Resources.Items() { - if err := applier.Apply(e.Ctx, resource); err != nil { + // We assume that server-side apply is enabled by default. + // It is currently convoluted to check pro-actively whether server-side apply + // is enabled. This is possible to fetch the OpenAPI endpoint, which returns + // the entire server API document, then lookup the resource PATCH endpoint, and + // check its list of accepted MIME types. + // As a simpler solution, we fall back to client-side apply at the first + // 415 error, and assume server-side apply is not available globally. + if hasServerSideApply { + err := t.serverSideApply(env, resource) + switch { + case err == nil: + continue + case isIncompatibleServerError(err): + t.L.Info("Fallback to client-side apply to patch resources") + hasServerSideApply = false + default: + // Keep server-side apply unless server is incompatible with it + return err + } + } + if err := t.clientSideApply(env, resource); err != nil { return err } } @@ -54,6 +91,73 @@ func (t *deployerTrait) Apply(e *Environment) error { return nil } +func (t *deployerTrait) serverSideApply(env *Environment, resource ctrl.Object) error { + target, err := patch.PositiveApplyPatch(resource) + if err != nil { + return err + } + err = env.Client.Patch(env.Ctx, target, ctrl.Apply, ctrl.ForceOwnership, ctrl.FieldOwner("camel-k-operator")) + if err != nil { + return fmt.Errorf("error during apply resource: %s/%s: %w", resource.GetNamespace(), resource.GetName(), err) + } + // Update the resource with the response returned from the API server + return t.unstructuredToRuntimeObject(target, resource) +} + +func (t *deployerTrait) clientSideApply(env *Environment, resource ctrl.Object) error { + err := env.Client.Create(env.Ctx, resource) + if err == nil { + return nil + } else if !k8serrors.IsAlreadyExists(err) { + return fmt.Errorf("error during create resource: %s/%s: %w", resource.GetNamespace(), resource.GetName(), err) + } + object := &unstructured.Unstructured{} + object.SetNamespace(resource.GetNamespace()) + object.SetName(resource.GetName()) + object.SetGroupVersionKind(resource.GetObjectKind().GroupVersionKind()) + err = env.Client.Get(env.Ctx, ctrl.ObjectKeyFromObject(object), object) + if err != nil { + return err + } + p, err := patch.PositiveMergePatch(object, resource) + if err != nil { + return err + } else if len(p) == 0 { + // Update the resource with the object returned from the API server + return t.unstructuredToRuntimeObject(object, resource) + } + err = env.Client.Patch(env.Ctx, resource, ctrl.RawPatch(types.MergePatchType, p)) + if err != nil { + return fmt.Errorf("error during patch %s/%s: %w", resource.GetNamespace(), resource.GetName(), err) + } + return nil +} + +func (t *deployerTrait) unstructuredToRuntimeObject(u *unstructured.Unstructured, obj ctrl.Object) error { + data, err := json.Marshal(u) + if err != nil { + return err + } + return json.Unmarshal(data, obj) +} + +func isIncompatibleServerError(err error) bool { + // First simpler check for older servers (i.e. OpenShift 3.11) + if strings.Contains(err.Error(), "415: Unsupported Media Type") { + return true + } + + // 415: Unsupported media type means we're talking to a server which doesn't + // support server-side apply. + var serr *k8serrors.StatusError + if errors.As(err, &serr) { + return serr.Status().Code == http.StatusUnsupportedMediaType + } + + // Non-StatusError means the error isn't because the server is incompatible. + return false +} + func (t *deployerTrait) SelectControllerStrategy(e *Environment) (*ControllerStrategy, error) { if IsFalse(t.Enabled) { return nil, nil