This is an automated email from the ASF dual-hosted git repository. nferraro pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit c0cc56005e0da9c27aece3c0a7a6428f6eba5948 Author: nicolaferraro <ni.ferr...@gmail.com> AuthorDate: Thu Dec 16 14:59:46 2021 +0100 Fix #1107: adding first support for Kamelets --- addons/keda/keda.go | 192 +++++++++++++++++++++++++++- pkg/apis/camel/v1alpha1/jsonschema_types.go | 4 +- pkg/client/serverside.go | 6 +- pkg/util/property/property.go | 11 ++ pkg/util/uri/uri.go | 15 ++- pkg/util/uri/uri_test.go | 67 ++++++++++ 6 files changed, 288 insertions(+), 7 deletions(-) diff --git a/addons/keda/keda.go b/addons/keda/keda.go index 834cea3..8396742 100644 --- a/addons/keda/keda.go +++ b/addons/keda/keda.go @@ -18,18 +18,42 @@ limitations under the License. package keda import ( + "fmt" + "sort" "strings" kedav1alpha1 "github.com/apache/camel-k/addons/keda/duck/v1alpha1" camelv1 "github.com/apache/camel-k/pkg/apis/camel/v1" "github.com/apache/camel-k/pkg/apis/camel/v1alpha1" camelv1alpha1 "github.com/apache/camel-k/pkg/apis/camel/v1alpha1" + "github.com/apache/camel-k/pkg/kamelet/repository" + "github.com/apache/camel-k/pkg/metadata" + "github.com/apache/camel-k/pkg/platform" "github.com/apache/camel-k/pkg/trait" + "github.com/apache/camel-k/pkg/util" + "github.com/apache/camel-k/pkg/util/kubernetes" + "github.com/apache/camel-k/pkg/util/property" + "github.com/apache/camel-k/pkg/util/source" + "github.com/apache/camel-k/pkg/util/uri" + "github.com/pkg/errors" scase "github.com/stoewer/go-strcase" v1 "k8s.io/api/core/v1" "sigs.k8s.io/controller-runtime/pkg/client" ) +const ( + // kameletURNTypePrefix indicates the scaler type associated to a Kamelet + kameletURNTypePrefix = "urn:keda:type:" + // kameletURNMetadataPrefix allows binding Kamelet properties to Keda metadata + kameletURNMetadataPrefix = "urn:keda:metadata:" + // kameletURNRequiredTag is used to mark properties required by Keda + kameletURNRequiredTag = "urn:keda:required" + + // kameletAnnotationType is an alternative to kameletURNTypePrefix. + // To be removed when the `spec -> definition -> x-descriptors` field becomes stable. + kameletAnnotationType = "camel.apache.org/keda.type" +) + // The Keda trait can be used for automatic integration with Keda autoscalers. // // The Keda trait is disabled by default. @@ -79,7 +103,14 @@ func (t *kedaTrait) Configure(e *trait.Environment) (bool, error) { return false, nil } - return true, nil + if t.Auto == nil || *t.Auto { + if err := t.populateTriggersFromKamelets(e); err != nil { + // TODO: set condition + return false, err + } + } + + return len(t.Triggers) > 0, nil } func (t *kedaTrait) Apply(e *trait.Environment) error { @@ -142,7 +173,6 @@ func (t *kedaTrait) getScaledObject(e *trait.Environment) (*kedav1alpha1.ScaledO func (t *kedaTrait) hackControllerReplicas(e *trait.Environment) error { ctrlRef := t.getTopControllerReference(e) - applier := e.Client.ServerOrClientSideApplier() if ctrlRef.Kind == camelv1alpha1.KameletBindingKind { // Update the KameletBinding directly (do not add it to env resources, it's the integration parent) key := client.ObjectKey{ @@ -156,7 +186,7 @@ func (t *kedaTrait) hackControllerReplicas(e *trait.Environment) error { if klb.Spec.Replicas == nil { one := int32(1) klb.Spec.Replicas = &one - if err := applier.Apply(e.Ctx, &klb); err != nil { + if err := e.Client.Update(e.Ctx, &klb); err != nil { return err } } @@ -164,7 +194,7 @@ func (t *kedaTrait) hackControllerReplicas(e *trait.Environment) error { if e.Integration.Spec.Replicas == nil { one := int32(1) e.Integration.Spec.Replicas = &one - if err := applier.Apply(e.Ctx, e.Integration); err != nil { + if err := e.Client.Update(e.Ctx, e.Integration); err != nil { return err } } @@ -188,3 +218,157 @@ func (t *kedaTrait) getTopControllerReference(e *trait.Environment) *v1.ObjectRe Name: e.Integration.Name, } } + +func (t *kedaTrait) populateTriggersFromKamelets(e *trait.Environment) error { + sources, err := kubernetes.ResolveIntegrationSources(e.Ctx, e.Client, e.Integration, e.Resources) + if err != nil { + return err + } + kameletURIs := make(map[string][]string) + metadata.Each(e.CamelCatalog, sources, func(_ int, meta metadata.IntegrationMetadata) bool { + for _, uri := range meta.FromURIs { + if kameletStr := source.ExtractKamelet(uri); kameletStr != "" && camelv1alpha1.ValidKameletName(kameletStr) { + kamelet := kameletStr + if strings.Contains(kamelet, "/") { + kamelet = kamelet[0:strings.Index(kamelet, "/")] + } + uriList := kameletURIs[kamelet] + util.StringSliceUniqueAdd(&uriList, uri) + sort.Strings(uriList) + kameletURIs[kamelet] = uriList + } + } + return true + }) + + if len(kameletURIs) == 0 { + return nil + } + + repo, err := repository.NewForPlatform(e.Ctx, e.Client, e.Platform, e.Integration.Namespace, platform.GetOperatorNamespace()) + if err != nil { + return err + } + + sortedKamelets := make([]string, 0, len(kameletURIs)) + for kamelet, _ := range kameletURIs { + sortedKamelets = append(sortedKamelets, kamelet) + } + sort.Strings(sortedKamelets) + for _, kamelet := range sortedKamelets { + uris := kameletURIs[kamelet] + if err := t.populateTriggersFromKamelet(e, repo, kamelet, uris); err != nil { + return err + } + } + + return nil +} + +func (t *kedaTrait) populateTriggersFromKamelet(e *trait.Environment, repo repository.KameletRepository, kameletName string, uris []string) error { + kamelet, err := repo.Get(e.Ctx, kameletName) + if err != nil { + return err + } else if kamelet == nil { + return fmt.Errorf("kamelet %q not found", kameletName) + } + if kamelet.Spec.Definition == nil { + return nil + } + triggerType := t.getKedaType(kamelet) + if triggerType == "" { + return nil + } + + metadataToProperty := make(map[string]string) + requiredMetadata := make(map[string]bool) + for k, def := range kamelet.Spec.Definition.Properties { + if metadataName := t.getXDescriptorValue(def.XDescriptors, kameletURNMetadataPrefix); metadataName != "" { + metadataToProperty[metadataName] = k + if req := t.isXDescriptorPresent(def.XDescriptors, kameletURNRequiredTag); req { + requiredMetadata[metadataName] = true + } + } + } + for _, uri := range uris { + if err := t.populateTriggersFromKameletURI(e, kameletName, triggerType, metadataToProperty, requiredMetadata, uri); err != nil { + return err + } + } + return nil +} + +func (t *kedaTrait) populateTriggersFromKameletURI(e *trait.Environment, kameletName string, triggerType string, metadataToProperty map[string]string, requiredMetadata map[string]bool, kameletURI string) error { + metaValues := make(map[string]string, len(metadataToProperty)) + for metaParam, prop := range metadataToProperty { + // From lowest priority to top + if v := e.ApplicationProperties[fmt.Sprintf("camel.kamelet.%s.%s", kameletName, prop)]; v != "" { + metaValues[metaParam] = v + } + if kameletID := uri.GetPathSegment(kameletURI, 0); kameletID != "" { + kameletSpecificKey := fmt.Sprintf("camel.kamelet.%s.%s.%s", kameletName, kameletID, prop) + if v := e.ApplicationProperties[kameletSpecificKey]; v != "" { + metaValues[metaParam] = v + } + for _, c := range e.Integration.Spec.Configuration { + if c.Type == "property" && strings.HasPrefix(c.Value, kameletSpecificKey) { + v, err := property.DecodePropertyFileValue(c.Value, kameletSpecificKey) + if err != nil { + return errors.Wrapf(err, "could not decode property %q", kameletSpecificKey) + } + metaValues[metaParam] = v + } + } + } + if v := uri.GetQueryParameter(kameletURI, prop); v != "" { + metaValues[metaParam] = v + } + } + + for req := range requiredMetadata { + if _, ok := metaValues[req]; !ok { + return fmt.Errorf("metadata parameter %q is missing in configuration: it is required by Keda", req) + } + } + + kebabMetaValues := make(map[string]string, len(metaValues)) + for k, v := range metaValues { + kebabMetaValues[scase.KebabCase(k)] = v + } + + // Add the trigger in config + trigger := kedaTrigger{ + Type: triggerType, + Metadata: kebabMetaValues, + } + t.Triggers = append(t.Triggers, trigger) + return nil +} + +func (t *kedaTrait) getKedaType(kamelet *camelv1alpha1.Kamelet) string { + if kamelet.Spec.Definition != nil { + triggerType := t.getXDescriptorValue(kamelet.Spec.Definition.XDescriptors, kameletURNTypePrefix) + if triggerType != "" { + return triggerType + } + } + return kamelet.Annotations[kameletAnnotationType] +} + +func (t *kedaTrait) getXDescriptorValue(descriptors []string, prefix string) string { + for _, d := range descriptors { + if strings.HasPrefix(d, prefix) { + return d[len(prefix):] + } + } + return "" +} + +func (t *kedaTrait) isXDescriptorPresent(descriptors []string, desc string) bool { + for _, d := range descriptors { + if d == desc { + return true + } + } + return false +} diff --git a/pkg/apis/camel/v1alpha1/jsonschema_types.go b/pkg/apis/camel/v1alpha1/jsonschema_types.go index 5e90f4f..87e178b 100644 --- a/pkg/apis/camel/v1alpha1/jsonschema_types.go +++ b/pkg/apis/camel/v1alpha1/jsonschema_types.go @@ -74,7 +74,7 @@ type JSONSchemaProp struct { Enum []JSON `json:"enum,omitempty"` Example *JSON `json:"example,omitempty"` Nullable bool `json:"nullable,omitempty"` - // The list of descriptors that determine which UI components to use on different views + // XDescriptors is a list of extended properties that trigger a custom behavior in external systems XDescriptors []string `json:"x-descriptors,omitempty"` } @@ -89,6 +89,8 @@ type JSONSchemaProps struct { ExternalDocs *ExternalDocumentation `json:"externalDocs,omitempty"` Schema JSONSchemaURL `json:"$schema,omitempty"` Type string `json:"type,omitempty"` + // XDescriptors is a list of extended properties that trigger a custom behavior in external systems + XDescriptors []string `json:"x-descriptors,omitempty"` } // RawMessage is a raw encoded JSON value. diff --git a/pkg/client/serverside.go b/pkg/client/serverside.go index 6efd758..bca029d 100644 --- a/pkg/client/serverside.go +++ b/pkg/client/serverside.go @@ -49,6 +49,7 @@ func (c *defaultClient) ServerOrClientSideApplier() ServerOrClientSideApplier { func (a *ServerOrClientSideApplier) Apply(ctx context.Context, object ctrl.Object) error { once := false var err error + needsRetry := false a.tryServerSideApply.Do(func() { once = true if err = a.serverSideApply(ctx, object); err != nil { @@ -57,12 +58,15 @@ func (a *ServerOrClientSideApplier) Apply(ctx context.Context, object ctrl.Objec a.hasServerSideApply.Store(false) err = nil } else { - a.tryServerSideApply = sync.Once{} + needsRetry = true } } else { a.hasServerSideApply.Store(true) } }) + if needsRetry { + a.tryServerSideApply = sync.Once{} + } if err != nil { return err } diff --git a/pkg/util/property/property.go b/pkg/util/property/property.go index 7f02b7a..87fcc1a 100644 --- a/pkg/util/property/property.go +++ b/pkg/util/property/property.go @@ -65,3 +65,14 @@ func SplitPropertyFileEntry(entry string) (string, string) { } return k, v } + +// DecodePropertyFileEntry returns the decoded value corresponding to the given key in the entry. +func DecodePropertyFileValue(entry, key string) (string, error) { + p := properties.NewProperties() + p.DisableExpansion = true + if err := p.Load([]byte(entry), properties.UTF8); err != nil { + return "", err + } + val, _ := p.Get(key) + return val, nil +} diff --git a/pkg/util/uri/uri.go b/pkg/util/uri/uri.go index 4e722c6..210f169 100644 --- a/pkg/util/uri/uri.go +++ b/pkg/util/uri/uri.go @@ -28,7 +28,7 @@ import ( ) var uriRegexp = regexp.MustCompile(`^[a-z0-9+][a-zA-Z0-9-+]*:.*$`) - +var pathExtractorRegexp = regexp.MustCompile(`^[a-z0-9+][a-zA-Z0-9-+]*:(?://){0,1}[^/?]+/([^?]+)(?:[?].*){0,1}$`) var queryExtractorRegexp = `^[^?]+\?(?:|.*[&])%s=([^&]+)(?:[&].*|$)` // HasCamelURIFormat tells if a given string may belong to a Camel URI, without checking any catalog. @@ -57,6 +57,19 @@ func GetQueryParameter(uri string, param string) string { return res } +// GetPathSegment returns the path segment of the URI corresponding to the given position (0 based), if present +func GetPathSegment(uri string, pos int) string { + match := pathExtractorRegexp.FindStringSubmatch(uri) + if len(match) > 1 { + fullPath := match[1] + parts := strings.Split(fullPath, "/") + if pos >= 0 && pos < len(parts) { + return parts[pos] + } + } + return "" +} + func matchOrEmpty(reg *regexp.Regexp, str string) string { match := reg.FindStringSubmatch(str) if len(match) > 1 { diff --git a/pkg/util/uri/uri_test.go b/pkg/util/uri/uri_test.go index 4000bf3..49ecb0c 100644 --- a/pkg/util/uri/uri_test.go +++ b/pkg/util/uri/uri_test.go @@ -180,3 +180,70 @@ func TestCamelURIFormat(t *testing.T) { }) } } + +func TestPathSegment(t *testing.T) { + tests := []struct { + uri string + pos int + expected string + }{ + { + uri: "direct:endpoint", + pos: 0, + }, + { + uri: "direct:endpoint", + pos: 12, + }, + { + uri: "kamelet:endpoint/", + pos: 0, + }, + { + uri: "kamelet:endpoint/s", + pos: 0, + expected: "s", + }, + { + uri: "kamelet:endpoint/s", + pos: 1, + }, + { + uri: "kamelet://endpoint/s", + pos: 0, + expected: "s", + }, + { + uri: "kamelet://endpoint/s/p", + pos: 0, + expected: "s", + }, + { + uri: "kamelet://endpoint/s/p", + pos: 1, + expected: "p", + }, + { + uri: "kamelet://endpoint/s/p?param=n", + pos: 1, + expected: "p", + }, + { + uri: "kamelet://endpoint/s/p?param=n&p2=n2", + pos: 1, + expected: "p", + }, + { + uri: "kamelet://endpoint/s/p?param=n&p2=n2", + pos: 2, + }, + } + + for _, test := range tests { + thetest := test + t.Run(thetest.uri, func(t *testing.T) { + param := GetPathSegment(thetest.uri, thetest.pos) + assert.Equal(t, thetest.expected, param) + }) + } +}