This is an automated email from the ASF dual-hosted git repository.

nferraro pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit c0cc56005e0da9c27aece3c0a7a6428f6eba5948
Author: nicolaferraro <ni.ferr...@gmail.com>
AuthorDate: Thu Dec 16 14:59:46 2021 +0100

    Fix #1107: adding first support for Kamelets
---
 addons/keda/keda.go                         | 192 +++++++++++++++++++++++++++-
 pkg/apis/camel/v1alpha1/jsonschema_types.go |   4 +-
 pkg/client/serverside.go                    |   6 +-
 pkg/util/property/property.go               |  11 ++
 pkg/util/uri/uri.go                         |  15 ++-
 pkg/util/uri/uri_test.go                    |  67 ++++++++++
 6 files changed, 288 insertions(+), 7 deletions(-)

diff --git a/addons/keda/keda.go b/addons/keda/keda.go
index 834cea3..8396742 100644
--- a/addons/keda/keda.go
+++ b/addons/keda/keda.go
@@ -18,18 +18,42 @@ limitations under the License.
 package keda
 
 import (
+       "fmt"
+       "sort"
        "strings"
 
        kedav1alpha1 "github.com/apache/camel-k/addons/keda/duck/v1alpha1"
        camelv1 "github.com/apache/camel-k/pkg/apis/camel/v1"
        "github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
        camelv1alpha1 "github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
+       "github.com/apache/camel-k/pkg/kamelet/repository"
+       "github.com/apache/camel-k/pkg/metadata"
+       "github.com/apache/camel-k/pkg/platform"
        "github.com/apache/camel-k/pkg/trait"
+       "github.com/apache/camel-k/pkg/util"
+       "github.com/apache/camel-k/pkg/util/kubernetes"
+       "github.com/apache/camel-k/pkg/util/property"
+       "github.com/apache/camel-k/pkg/util/source"
+       "github.com/apache/camel-k/pkg/util/uri"
+       "github.com/pkg/errors"
        scase "github.com/stoewer/go-strcase"
        v1 "k8s.io/api/core/v1"
        "sigs.k8s.io/controller-runtime/pkg/client"
 )
 
+const (
+       // kameletURNTypePrefix indicates the scaler type associated to a 
Kamelet
+       kameletURNTypePrefix = "urn:keda:type:"
+       // kameletURNMetadataPrefix allows binding Kamelet properties to Keda 
metadata
+       kameletURNMetadataPrefix = "urn:keda:metadata:"
+       // kameletURNRequiredTag is used to mark properties required by Keda
+       kameletURNRequiredTag = "urn:keda:required"
+
+       // kameletAnnotationType is an alternative to kameletURNTypePrefix.
+       // To be removed when the `spec -> definition -> x-descriptors` field 
becomes stable.
+       kameletAnnotationType = "camel.apache.org/keda.type"
+)
+
 // The Keda trait can be used for automatic integration with Keda autoscalers.
 //
 // The Keda trait is disabled by default.
@@ -79,7 +103,14 @@ func (t *kedaTrait) Configure(e *trait.Environment) (bool, 
error) {
                return false, nil
        }
 
-       return true, nil
+       if t.Auto == nil || *t.Auto {
+               if err := t.populateTriggersFromKamelets(e); err != nil {
+                       // TODO: set condition
+                       return false, err
+               }
+       }
+
+       return len(t.Triggers) > 0, nil
 }
 
 func (t *kedaTrait) Apply(e *trait.Environment) error {
@@ -142,7 +173,6 @@ func (t *kedaTrait) getScaledObject(e *trait.Environment) 
(*kedav1alpha1.ScaledO
 
 func (t *kedaTrait) hackControllerReplicas(e *trait.Environment) error {
        ctrlRef := t.getTopControllerReference(e)
-       applier := e.Client.ServerOrClientSideApplier()
        if ctrlRef.Kind == camelv1alpha1.KameletBindingKind {
                // Update the KameletBinding directly (do not add it to env 
resources, it's the integration parent)
                key := client.ObjectKey{
@@ -156,7 +186,7 @@ func (t *kedaTrait) hackControllerReplicas(e 
*trait.Environment) error {
                if klb.Spec.Replicas == nil {
                        one := int32(1)
                        klb.Spec.Replicas = &one
-                       if err := applier.Apply(e.Ctx, &klb); err != nil {
+                       if err := e.Client.Update(e.Ctx, &klb); err != nil {
                                return err
                        }
                }
@@ -164,7 +194,7 @@ func (t *kedaTrait) hackControllerReplicas(e 
*trait.Environment) error {
                if e.Integration.Spec.Replicas == nil {
                        one := int32(1)
                        e.Integration.Spec.Replicas = &one
-                       if err := applier.Apply(e.Ctx, e.Integration); err != 
nil {
+                       if err := e.Client.Update(e.Ctx, e.Integration); err != 
nil {
                                return err
                        }
                }
@@ -188,3 +218,157 @@ func (t *kedaTrait) getTopControllerReference(e 
*trait.Environment) *v1.ObjectRe
                Name:       e.Integration.Name,
        }
 }
+
+func (t *kedaTrait) populateTriggersFromKamelets(e *trait.Environment) error {
+       sources, err := kubernetes.ResolveIntegrationSources(e.Ctx, e.Client, 
e.Integration, e.Resources)
+       if err != nil {
+               return err
+       }
+       kameletURIs := make(map[string][]string)
+       metadata.Each(e.CamelCatalog, sources, func(_ int, meta 
metadata.IntegrationMetadata) bool {
+               for _, uri := range meta.FromURIs {
+                       if kameletStr := source.ExtractKamelet(uri); kameletStr 
!= "" && camelv1alpha1.ValidKameletName(kameletStr) {
+                               kamelet := kameletStr
+                               if strings.Contains(kamelet, "/") {
+                                       kamelet = 
kamelet[0:strings.Index(kamelet, "/")]
+                               }
+                               uriList := kameletURIs[kamelet]
+                               util.StringSliceUniqueAdd(&uriList, uri)
+                               sort.Strings(uriList)
+                               kameletURIs[kamelet] = uriList
+                       }
+               }
+               return true
+       })
+
+       if len(kameletURIs) == 0 {
+               return nil
+       }
+
+       repo, err := repository.NewForPlatform(e.Ctx, e.Client, e.Platform, 
e.Integration.Namespace, platform.GetOperatorNamespace())
+       if err != nil {
+               return err
+       }
+
+       sortedKamelets := make([]string, 0, len(kameletURIs))
+       for kamelet, _ := range kameletURIs {
+               sortedKamelets = append(sortedKamelets, kamelet)
+       }
+       sort.Strings(sortedKamelets)
+       for _, kamelet := range sortedKamelets {
+               uris := kameletURIs[kamelet]
+               if err := t.populateTriggersFromKamelet(e, repo, kamelet, 
uris); err != nil {
+                       return err
+               }
+       }
+
+       return nil
+}
+
+func (t *kedaTrait) populateTriggersFromKamelet(e *trait.Environment, repo 
repository.KameletRepository, kameletName string, uris []string) error {
+       kamelet, err := repo.Get(e.Ctx, kameletName)
+       if err != nil {
+               return err
+       } else if kamelet == nil {
+               return fmt.Errorf("kamelet %q not found", kameletName)
+       }
+       if kamelet.Spec.Definition == nil {
+               return nil
+       }
+       triggerType := t.getKedaType(kamelet)
+       if triggerType == "" {
+               return nil
+       }
+
+       metadataToProperty := make(map[string]string)
+       requiredMetadata := make(map[string]bool)
+       for k, def := range kamelet.Spec.Definition.Properties {
+               if metadataName := t.getXDescriptorValue(def.XDescriptors, 
kameletURNMetadataPrefix); metadataName != "" {
+                       metadataToProperty[metadataName] = k
+                       if req := t.isXDescriptorPresent(def.XDescriptors, 
kameletURNRequiredTag); req {
+                               requiredMetadata[metadataName] = true
+                       }
+               }
+       }
+       for _, uri := range uris {
+               if err := t.populateTriggersFromKameletURI(e, kameletName, 
triggerType, metadataToProperty, requiredMetadata, uri); err != nil {
+                       return err
+               }
+       }
+       return nil
+}
+
+func (t *kedaTrait) populateTriggersFromKameletURI(e *trait.Environment, 
kameletName string, triggerType string, metadataToProperty map[string]string, 
requiredMetadata map[string]bool, kameletURI string) error {
+       metaValues := make(map[string]string, len(metadataToProperty))
+       for metaParam, prop := range metadataToProperty {
+               // From lowest priority to top
+               if v := 
e.ApplicationProperties[fmt.Sprintf("camel.kamelet.%s.%s", kameletName, prop)]; 
v != "" {
+                       metaValues[metaParam] = v
+               }
+               if kameletID := uri.GetPathSegment(kameletURI, 0); kameletID != 
"" {
+                       kameletSpecificKey := 
fmt.Sprintf("camel.kamelet.%s.%s.%s", kameletName, kameletID, prop)
+                       if v := e.ApplicationProperties[kameletSpecificKey]; v 
!= "" {
+                               metaValues[metaParam] = v
+                       }
+                       for _, c := range e.Integration.Spec.Configuration {
+                               if c.Type == "property" && 
strings.HasPrefix(c.Value, kameletSpecificKey) {
+                                       v, err := 
property.DecodePropertyFileValue(c.Value, kameletSpecificKey)
+                                       if err != nil {
+                                               return errors.Wrapf(err, "could 
not decode property %q", kameletSpecificKey)
+                                       }
+                                       metaValues[metaParam] = v
+                               }
+                       }
+               }
+               if v := uri.GetQueryParameter(kameletURI, prop); v != "" {
+                       metaValues[metaParam] = v
+               }
+       }
+
+       for req := range requiredMetadata {
+               if _, ok := metaValues[req]; !ok {
+                       return fmt.Errorf("metadata parameter %q is missing in 
configuration: it is required by Keda", req)
+               }
+       }
+
+       kebabMetaValues := make(map[string]string, len(metaValues))
+       for k, v := range metaValues {
+               kebabMetaValues[scase.KebabCase(k)] = v
+       }
+
+       // Add the trigger in config
+       trigger := kedaTrigger{
+               Type:     triggerType,
+               Metadata: kebabMetaValues,
+       }
+       t.Triggers = append(t.Triggers, trigger)
+       return nil
+}
+
+func (t *kedaTrait) getKedaType(kamelet *camelv1alpha1.Kamelet) string {
+       if kamelet.Spec.Definition != nil {
+               triggerType := 
t.getXDescriptorValue(kamelet.Spec.Definition.XDescriptors, 
kameletURNTypePrefix)
+               if triggerType != "" {
+                       return triggerType
+               }
+       }
+       return kamelet.Annotations[kameletAnnotationType]
+}
+
+func (t *kedaTrait) getXDescriptorValue(descriptors []string, prefix string) 
string {
+       for _, d := range descriptors {
+               if strings.HasPrefix(d, prefix) {
+                       return d[len(prefix):]
+               }
+       }
+       return ""
+}
+
+func (t *kedaTrait) isXDescriptorPresent(descriptors []string, desc string) 
bool {
+       for _, d := range descriptors {
+               if d == desc {
+                       return true
+               }
+       }
+       return false
+}
diff --git a/pkg/apis/camel/v1alpha1/jsonschema_types.go 
b/pkg/apis/camel/v1alpha1/jsonschema_types.go
index 5e90f4f..87e178b 100644
--- a/pkg/apis/camel/v1alpha1/jsonschema_types.go
+++ b/pkg/apis/camel/v1alpha1/jsonschema_types.go
@@ -74,7 +74,7 @@ type JSONSchemaProp struct {
        Enum             []JSON       `json:"enum,omitempty"`
        Example          *JSON        `json:"example,omitempty"`
        Nullable         bool         `json:"nullable,omitempty"`
-       // The list of descriptors that determine which UI components to use on 
different views
+       // XDescriptors is a list of extended properties that trigger a custom 
behavior in external systems
        XDescriptors []string `json:"x-descriptors,omitempty"`
 }
 
@@ -89,6 +89,8 @@ type JSONSchemaProps struct {
        ExternalDocs *ExternalDocumentation    `json:"externalDocs,omitempty"`
        Schema       JSONSchemaURL             `json:"$schema,omitempty"`
        Type         string                    `json:"type,omitempty"`
+       // XDescriptors is a list of extended properties that trigger a custom 
behavior in external systems
+       XDescriptors []string `json:"x-descriptors,omitempty"`
 }
 
 // RawMessage is a raw encoded JSON value.
diff --git a/pkg/client/serverside.go b/pkg/client/serverside.go
index 6efd758..bca029d 100644
--- a/pkg/client/serverside.go
+++ b/pkg/client/serverside.go
@@ -49,6 +49,7 @@ func (c *defaultClient) ServerOrClientSideApplier() 
ServerOrClientSideApplier {
 func (a *ServerOrClientSideApplier) Apply(ctx context.Context, object 
ctrl.Object) error {
        once := false
        var err error
+       needsRetry := false
        a.tryServerSideApply.Do(func() {
                once = true
                if err = a.serverSideApply(ctx, object); err != nil {
@@ -57,12 +58,15 @@ func (a *ServerOrClientSideApplier) Apply(ctx 
context.Context, object ctrl.Objec
                                a.hasServerSideApply.Store(false)
                                err = nil
                        } else {
-                               a.tryServerSideApply = sync.Once{}
+                               needsRetry = true
                        }
                } else {
                        a.hasServerSideApply.Store(true)
                }
        })
+       if needsRetry {
+               a.tryServerSideApply = sync.Once{}
+       }
        if err != nil {
                return err
        }
diff --git a/pkg/util/property/property.go b/pkg/util/property/property.go
index 7f02b7a..87fcc1a 100644
--- a/pkg/util/property/property.go
+++ b/pkg/util/property/property.go
@@ -65,3 +65,14 @@ func SplitPropertyFileEntry(entry string) (string, string) {
        }
        return k, v
 }
+
+// DecodePropertyFileEntry returns the decoded value corresponding to the 
given key in the entry.
+func DecodePropertyFileValue(entry, key string) (string, error) {
+       p := properties.NewProperties()
+       p.DisableExpansion = true
+       if err := p.Load([]byte(entry), properties.UTF8); err != nil {
+               return "", err
+       }
+       val, _ := p.Get(key)
+       return val, nil
+}
diff --git a/pkg/util/uri/uri.go b/pkg/util/uri/uri.go
index 4e722c6..210f169 100644
--- a/pkg/util/uri/uri.go
+++ b/pkg/util/uri/uri.go
@@ -28,7 +28,7 @@ import (
 )
 
 var uriRegexp = regexp.MustCompile(`^[a-z0-9+][a-zA-Z0-9-+]*:.*$`)
-
+var pathExtractorRegexp = 
regexp.MustCompile(`^[a-z0-9+][a-zA-Z0-9-+]*:(?://){0,1}[^/?]+/([^?]+)(?:[?].*){0,1}$`)
 var queryExtractorRegexp = `^[^?]+\?(?:|.*[&])%s=([^&]+)(?:[&].*|$)`
 
 // HasCamelURIFormat tells if a given string may belong to a Camel URI, 
without checking any catalog.
@@ -57,6 +57,19 @@ func GetQueryParameter(uri string, param string) string {
        return res
 }
 
+// GetPathSegment returns the path segment of the URI corresponding to the 
given position (0 based), if present
+func GetPathSegment(uri string, pos int) string {
+       match := pathExtractorRegexp.FindStringSubmatch(uri)
+       if len(match) > 1 {
+               fullPath := match[1]
+               parts := strings.Split(fullPath, "/")
+               if pos >= 0 && pos < len(parts) {
+                       return parts[pos]
+               }
+       }
+       return ""
+}
+
 func matchOrEmpty(reg *regexp.Regexp, str string) string {
        match := reg.FindStringSubmatch(str)
        if len(match) > 1 {
diff --git a/pkg/util/uri/uri_test.go b/pkg/util/uri/uri_test.go
index 4000bf3..49ecb0c 100644
--- a/pkg/util/uri/uri_test.go
+++ b/pkg/util/uri/uri_test.go
@@ -180,3 +180,70 @@ func TestCamelURIFormat(t *testing.T) {
                })
        }
 }
+
+func TestPathSegment(t *testing.T) {
+       tests := []struct {
+               uri      string
+               pos      int
+               expected string
+       }{
+               {
+                       uri: "direct:endpoint",
+                       pos: 0,
+               },
+               {
+                       uri: "direct:endpoint",
+                       pos: 12,
+               },
+               {
+                       uri: "kamelet:endpoint/",
+                       pos: 0,
+               },
+               {
+                       uri:      "kamelet:endpoint/s",
+                       pos:      0,
+                       expected: "s",
+               },
+               {
+                       uri: "kamelet:endpoint/s",
+                       pos: 1,
+               },
+               {
+                       uri:      "kamelet://endpoint/s",
+                       pos:      0,
+                       expected: "s",
+               },
+               {
+                       uri:      "kamelet://endpoint/s/p",
+                       pos:      0,
+                       expected: "s",
+               },
+               {
+                       uri:      "kamelet://endpoint/s/p",
+                       pos:      1,
+                       expected: "p",
+               },
+               {
+                       uri:      "kamelet://endpoint/s/p?param=n",
+                       pos:      1,
+                       expected: "p",
+               },
+               {
+                       uri:      "kamelet://endpoint/s/p?param=n&p2=n2",
+                       pos:      1,
+                       expected: "p",
+               },
+               {
+                       uri: "kamelet://endpoint/s/p?param=n&p2=n2",
+                       pos: 2,
+               },
+       }
+
+       for _, test := range tests {
+               thetest := test
+               t.Run(thetest.uri, func(t *testing.T) {
+                       param := GetPathSegment(thetest.uri, thetest.pos)
+                       assert.Equal(t, thetest.expected, param)
+               })
+       }
+}

Reply via email to