This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git

commit 5a8524c0ce7199f512105497d0a2791fe5db997b
Author: lburgazzoli <lburgazz...@gmail.com>
AuthorDate: Tue May 19 15:04:03 2020 +0200

    Improve knative broker integration (add support for dynamic event type) #326
---
 .../component/knative/http/KnativeHttpTest.java    | 57 ++++++++++++++++++++++
 .../camel/component/knative/KnativeComponent.java  |  2 +-
 .../knative/ce/AbstractCloudEventProcessor.java    |  7 ++-
 3 files changed, 61 insertions(+), 5 deletions(-)

diff --git 
a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
 
b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
index a4a55a6..edb81cc 100644
--- 
a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
+++ 
b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
@@ -1580,5 +1580,62 @@ public class KnativeHttpTest {
             server.stop();
         }
     }
+
+    @ParameterizedTest
+    @EnumSource(CloudEvents.class)
+    void testDynamicEventBridge(CloudEvent ce) throws Exception {
+        final int port = AvailablePortFinder.getNextAvailable();
+        final KnativeHttpServer server = new KnativeHttpServer(context, port);
+
+        configureKnativeComponent(
+            context,
+            ce,
+            event(
+                Knative.EndpointKind.sink,
+                "default",
+                "localhost",
+                port,
+                mapOf(
+                    Knative.CONTENT_TYPE, "text/plain"
+                )),
+            sourceEvent(
+                "event.source",
+                mapOf(
+                    Knative.CONTENT_TYPE, "text/plain"
+                ))
+        );
+
+        RouteBuilder.addRoutes(context, b -> {
+            b.from("knative:event/event.source")
+                
.setHeader(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).constant("event.sink")
+                .to("knative:event");
+        });
+
+        context.start();
+
+        try {
+            server.start();
+
+            given()
+                .body("test")
+                .header(Exchange.CONTENT_TYPE, "text/plain")
+                
.header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), 
ce.version())
+                
.header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), 
"event.source")
+                
.header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), 
"myEventID")
+                
.header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), 
DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
+                
.header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), 
"/somewhere")
+            .when()
+                .post()
+            .then()
+                .statusCode(204);
+
+            HttpServerRequest request = server.poll(30, TimeUnit.SECONDS);
+            
assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version());
+            
assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo("event.sink");
+            
assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
+        } finally {
+            server.stop();
+        }
+    }
 }
 
diff --git 
a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java
 
b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java
index 9cbfa6b..8b30ee6 100644
--- 
a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java
+++ 
b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java
@@ -215,7 +215,7 @@ public class KnativeComponent extends DefaultComponent {
             throw new IllegalArgumentException("Expecting URI in the form of: 
'knative:type/name', got '" + uri + "'");
         }
 
-        final String type = StringHelper.before(remaining, "/");
+        final String type = 
ObjectHelper.supplyIfEmpty(StringHelper.before(remaining, "/"), () -> 
remaining);
         final String name = StringHelper.after(remaining, "/");
         final KnativeConfiguration conf = getKnativeConfiguration();
 
diff --git 
a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java
 
b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java
index d5a73da..8c9be63 100644
--- 
a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java
+++ 
b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java
@@ -93,12 +93,11 @@ abstract class AbstractCloudEventProcessor implements 
CloudEventProcessor {
             headers.putIfAbsent(Exchange.CONTENT_TYPE, contentType);
 
             //
-            // in case of events, the type of the event defined as URI param 
so we need
+            // in case of events, if the type of the event is defined as URI 
param so we need
             // to override it to avoid the event type be overridden by 
Messages's headers
             //
-            if (endpoint.getType() == Knative.Type.event) {
-                Object eventType = 
headers.get(CloudEvent.CAMEL_CLOUD_EVENT_TYPE);
-
+            if (endpoint.getType() == Knative.Type.event && endpoint.getName() 
!= null) {
+                final Object eventType = 
headers.get(CloudEvent.CAMEL_CLOUD_EVENT_TYPE);
                 if (eventType != null) {
                     logger.debug("Detected the presence of {} header with 
value {}: it will be ignored and replaced by value set as uri parameter {}",
                         CloudEvent.CAMEL_CLOUD_EVENT_TYPE,

Reply via email to